You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/07/21 22:09:15 UTC
[01/10] incubator-usergrid git commit: Add new RxJava based
multi-threaded ExportApp tool, and upgrade to RxJava 1.0.12.
Repository: incubator-usergrid
Updated Branches:
refs/heads/master 897fd5083 -> 9b9f55ac2
Add new RxJava based multi-threaded ExportApp tool, and upgrade to RxJava 1.0.12.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b2bdbb54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b2bdbb54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b2bdbb54
Branch: refs/heads/master
Commit: b2bdbb5456301dbcf7131e780d968514cdd7e55d
Parents: 75ad454
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Jul 13 10:21:09 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Jul 13 10:21:09 2015 -0400
----------------------------------------------------------------------
stack/core/pom.xml | 9 +-
stack/pom.xml | 2 +-
.../org/apache/usergrid/tools/ExportApp.java | 687 +++++++++++++++++++
.../apache/usergrid/tools/ExportAppTest.java | 97 +++
4 files changed, 790 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2bdbb54/stack/core/pom.xml
----------------------------------------------------------------------
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index 47fa840..f60dbc2 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -573,14 +573,15 @@
</dependency>
<dependency>
- <groupId>com.netflix.rxjava</groupId>
- <artifactId>rxjava-core</artifactId>
+ <groupId>io.reactivex</groupId>
+ <artifactId>rxjava</artifactId>
<version>${rx.version}</version>
</dependency>
+
<dependency>
- <groupId>com.netflix.rxjava</groupId>
+ <groupId>io.reactivex</groupId>
<artifactId>rxjava-math</artifactId>
- <version>${rx.version}</version>
+ <version>1.0.0</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2bdbb54/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index bdc3549..da1b62c 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -108,7 +108,7 @@
<antlr.version>3.4</antlr.version>
<tika.version>1.4</tika.version>
<metrics.version>3.0.0</metrics.version>
- <rx.version>0.19.6</rx.version>
+ <rx.version>1.0.12</rx.version>
</properties>
<licenses>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2bdbb54/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
new file mode 100644
index 0000000..ceb3ecd
--- /dev/null
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -0,0 +1,687 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.Results;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.util.MinimalPrettyPrinter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Scheduler;
+import rx.Subscriber;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * Export application's collections.
+ */
+public class ExportApp extends ExportingToolBase {
+ static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
+
+ // we will write two types of files: entities and connections
+ BlockingQueue<ExportEntity> entityWriteQueue = new LinkedBlockingQueue<ExportEntity>();
+ BlockingQueue<ExportConnection> connectionWriteQueue = new LinkedBlockingQueue<ExportConnection>();
+
+ static final String APPLICATION_NAME = "application";
+
+ int pollTimeoutSeconds = 10;
+
+ // limiting output threads will limit output files
+ final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(8);
+ final Scheduler scheduler = Schedulers.from( threadPoolExecutor );
+
+ Map<Thread, JsonGenerator> entityGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
+ Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
+
+ List<String> emptyFiles = new ArrayList<String>();
+
+ AtomicInteger activePollers = new AtomicInteger(0);
+ AtomicInteger entitiesQueued = new AtomicInteger(0);
+ AtomicInteger entitiesWritten = new AtomicInteger(0);
+ AtomicInteger connectionsWritten = new AtomicInteger(0);
+ AtomicInteger connectionsQueued = new AtomicInteger(0);
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ /**
+ * Export admin users using multiple threads.
+ * <p/>
+ * How it works:
+ * In main thread we query for IDs of all admin users, add each ID to read queue.
+ * Read-queue workers read admin user data, add data to write queue.
+ * One write-queue worker reads data writes to file.
+ */
+ @Override
+ public void runTool(CommandLine line) throws Exception {
+
+ String applicationName = line.getOptionValue( APPLICATION_NAME );
+
+// if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
+// try {
+// readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
+// } catch (NumberFormatException nfe) {
+// logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+// return;
+// }
+// } else {
+// readThreadCount = 20;
+// }
+
+ startSpring();
+
+ setVerbose( line );
+
+ applyOrgId( line );
+ prepareBaseOutputFileName( line );
+ outputDir = createOutputParentDir();
+ logger.info( "Export directory: " + outputDir.getAbsolutePath() );
+
+ UUID applicationId = emf.lookupApplication( applicationName );
+ final EntityManager em = emf.getEntityManager( applicationId );
+
+ // start write queue workers
+
+ EntityWritesOnSubscribe entityWritesOnSub = new EntityWritesOnSubscribe( entityWriteQueue );
+ rx.Observable entityWritesObservable = rx.Observable.create( entityWritesOnSub );
+ entityWritesObservable.flatMap( new Func1<ExportEntity, Observable<?>>() {
+ public Observable<ExportEntity> call(ExportEntity exportEntity) {
+ return Observable.just(exportEntity).doOnNext(
+ new EntityWriteAction() ).subscribeOn( scheduler );
+ }
+ },10).subscribeOn( scheduler ).subscribe();
+
+ ConnectionWritesOnSubscribe connectionWritesOnSub = new ConnectionWritesOnSubscribe( connectionWriteQueue );
+ rx.Observable connectionWritesObservable = rx.Observable.create( connectionWritesOnSub );
+ connectionWritesObservable.flatMap( new Func1<ExportConnection, Observable<?>>() {
+ public Observable<ExportConnection> call(ExportConnection connection ) {
+ return Observable.just(connection).doOnNext(
+ new ConnectionWriteAction()).subscribeOn( scheduler );
+ }
+ },10).subscribeOn( scheduler ).subscribe();
+
+ // start processing data and filling up write queues
+
+ CollectionsOnSubscribe onSubscribe = new CollectionsOnSubscribe( em );
+ rx.Observable collectionsObservable = rx.Observable.create( onSubscribe );
+ collectionsObservable.flatMap( new Func1<String, Observable<String>>() {
+ public Observable<String> call(String collection) {
+ return Observable.just(collection).doOnNext(
+ new CollectionAction( em ) ).subscribeOn( Schedulers.io() );
+ }
+ },40).subscribeOn( Schedulers.io() ).subscribe();
+
+ // wait for write thread pollers to get started
+
+ try { Thread.sleep( 1000 ); } catch (InterruptedException ignored) {}
+
+ // wait for write-thread pollers to stop
+
+ while ( activePollers.get() > 0 ) {
+ logger.info(
+ "Active write threads: {}\n"
+ +"Entities written: {}\n"
+ +"Entities queued: {}\n"
+ +"Connections written: {}\n"
+ +"Connections queued: {}\n",
+ new Object[] {
+ activePollers.get(),
+ entitiesWritten.get(),
+ entitiesQueued.get(),
+ connectionsWritten.get(),
+ connectionsQueued.get()} );
+ try { Thread.sleep( 5000 ); } catch (InterruptedException ignored) {}
+ }
+
+ // wrap up files
+
+ for ( JsonGenerator gen : entityGeneratorsByThread.values() ) {
+ //gen.writeEndArray();
+ gen.flush();
+ gen.close();
+ }
+
+ for ( JsonGenerator gen : connectionGeneratorsByThread.values() ) {
+ //gen.writeEndArray();
+ gen.flush();
+ gen.close();
+ }
+
+ for ( String fileName : emptyFiles ) {
+ File emptyFile = new File(fileName);
+ emptyFile.deleteOnExit();
+ }
+
+ }
+
+ @Override
+ @SuppressWarnings("static-access")
+ public Options createOptions() {
+
+ Options options = super.createOptions();
+
+ Option readThreads = OptionBuilder.hasArg().withType("")
+ .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
+ options.addOption( readThreads );
+
+// Option readThreads = OptionBuilder
+// .hasArg().withType(0).withDescription("Read Threads -" + READ_THREAD_COUNT).create(READ_THREAD_COUNT);
+// options.addOption( readThreads );
+
+ return options;
+ }
+
+ // ----------------------------------------------------------------------------------------
+ // reading data
+
+ /**
+ * Emits collection names found in application.
+ */
+ class CollectionsOnSubscribe implements rx.Observable.OnSubscribe<String> {
+ EntityManager em;
+
+ public CollectionsOnSubscribe( EntityManager em ) {
+ this.em = em;
+ }
+
+ public void call(Subscriber<? super String> subscriber) {
+
+ logger.info("Starting to read collections");
+
+ int count = 0;
+ try {
+ Map<String, Object> collectionMetadata = em.getApplicationCollectionMetadata();
+ for ( String collection : collectionMetadata.keySet() ) {
+ subscriber.onNext( collection );
+ count++;
+ }
+
+ } catch (Exception e) {
+ subscriber.onError( e );
+ }
+ logger.info("Done. Read {} collection names", count);
+ if ( count > 0 ) {
+ subscriber.onCompleted();
+ } else {
+ subscriber.unsubscribe();
+ }
+ }
+ }
+
+ /**
+ * Emits entities of collection.
+ */
+ class EntityOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
+ EntityManager em;
+ String collection;
+
+ public EntityOnSubscribe(EntityManager em, String collection) {
+ this.em = em;
+ this.collection = collection;
+ }
+
+ public void call(Subscriber<? super ExportEntity> subscriber) {
+
+ logger.info("Starting to read entities of collection {}", collection);
+
+ try {
+ int count = 0;
+
+ Query query = new Query();
+ query.setLimit( MAX_ENTITY_FETCH );
+
+ Results results = em.searchCollection( em.getApplicationRef(), collection, query );
+
+ while (results.size() > 0) {
+ for (Entity entity : results.getEntities()) {
+ try {
+ Set<String> dictionaries = em.getDictionaries( entity );
+ Map dictionariesByName = new HashMap<String, Map<Object, Object>>();
+ for (String dictionary : dictionaries) {
+ Map<Object, Object> dict = em.getDictionaryAsMap( entity, dictionary );
+ if (dict.isEmpty()) {
+ continue;
+ }
+ dictionariesByName.put( dictionary, dict );
+ }
+ ExportEntity exportEntity = new ExportEntity(
+ em.getApplication().getApplicationName(),
+ entity, dictionariesByName );
+ subscriber.onNext( exportEntity );
+ count++;
+
+ } catch (Exception e) {
+ logger.error("Error reading entity " + entity.getUuid() +" from collection " + collection);
+ }
+ }
+ if (results.getCursor() == null) {
+ break;
+ }
+ query.setCursor( results.getCursor() );
+ results = em.searchCollection( em.getApplicationRef(), collection, query );
+ }
+
+ logger.info("Done. Read {} entities", count);
+ if ( count > 0 ) {
+ subscriber.onCompleted();
+ } else {
+ subscriber.unsubscribe();
+ }
+
+ } catch ( Exception e ) {
+ subscriber.onError(e);
+ }
+ }
+ }
+
+ /**
+ * Emits connections of an entity.
+ */
+ class ConnectionsOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
+ EntityManager em;
+ ExportEntity exportEntity;
+
+ public ConnectionsOnSubscribe(EntityManager em, ExportEntity exportEntity) {
+ this.em = em;
+ this.exportEntity = exportEntity;
+ }
+
+ public void call(Subscriber<? super ExportConnection> subscriber) {
+
+ logger.info("Starting to read connections for entity type {}", exportEntity.getEntity().getType());
+
+ int count = 0;
+
+ try {
+ Set<String> connectionTypes = em.getConnectionTypes( exportEntity.getEntity() );
+ for (String connectionType : connectionTypes) {
+
+ Results results = em.getConnectedEntities(
+ exportEntity.getEntity().getUuid(), connectionType, null, Results.Level.CORE_PROPERTIES );
+
+ for (Entity connectedEntity : results.getEntities()) {
+ try {
+ ExportConnection connection = new ExportConnection(
+ em.getApplication().getApplicationName(),
+ connectionType,
+ exportEntity.getEntity().getUuid(),
+ connectedEntity.getUuid());
+ subscriber.onNext( connection );
+ count++;
+
+ } catch (Exception e) {
+ logger.error( "Error reading connection entity "
+ + exportEntity.getEntity().getUuid() + " -> " + connectedEntity.getType());
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ subscriber.onError( e );
+ }
+
+ logger.info("Done. Read {} connections", count);
+ if ( count > 0 ) {
+ subscriber.onCompleted();
+ } else {
+ subscriber.unsubscribe();
+ }
+ }
+ }
+
+ /**
+ * Process collection by starting processing of its entities.
+ */
+ class CollectionAction implements Action1<String> {
+ EntityManager em;
+
+ public CollectionAction( EntityManager em ) {
+ this.em = em;
+ }
+
+ public void call(String collection) {
+
+ // process entities of collection in parallel
+ EntityOnSubscribe onSubscribe = new EntityOnSubscribe( em, collection );
+ rx.Observable entityObservable = rx.Observable.create( onSubscribe );
+ entityObservable.flatMap( new Func1<ExportEntity, Observable<ExportEntity>>() {
+ public Observable<ExportEntity> call(ExportEntity exportEntity) {
+ return Observable.just(exportEntity).doOnNext(
+ new EntityAction( em ) ).subscribeOn( Schedulers.io() );
+ }
+ }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+ }
+ }
+
+ /**
+ * Process entity by adding it to entityWriteQueue and starting processing of its connections.
+ */
+ class EntityAction implements Action1<ExportEntity> {
+ EntityManager em;
+
+ public EntityAction( EntityManager em ) {
+ this.em = em;
+ }
+
+ public void call(ExportEntity exportEntity) {
+ //logger.debug( "Processing entity: " + exportEntity.getEntity().getUuid() );
+
+ entityWriteQueue.add( exportEntity );
+ entitiesQueued.getAndIncrement();
+
+ // if entity has connections, process them in parallel
+ try {
+ Results connectedEntities = em.getConnectedEntities(
+ exportEntity.getEntity().getUuid(), null, null, Results.Level.CORE_PROPERTIES );
+
+ if ( !connectedEntities.isEmpty() ) {
+ ConnectionsOnSubscribe onSubscribe = new ConnectionsOnSubscribe( em, exportEntity );
+ rx.Observable entityObservable = rx.Observable.create( onSubscribe );
+
+ entityObservable.flatMap( new Func1<ExportConnection, Observable<ExportConnection>>() {
+ public Observable<ExportConnection> call(ExportConnection connection) {
+ return Observable.just(connection).doOnNext(
+ new ConnectionsAction() ).subscribeOn( Schedulers.io() );
+ }
+ }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+ }
+
+ } catch (Exception e) {
+ throw new RuntimeException( "Error getting connections", e );
+ }
+ }
+ }
+
+ /**
+ * Process connection by adding it to connectionWriteQueue.
+ */
+ class ConnectionsAction implements Action1<ExportConnection> {
+
+ public void call(ExportConnection conn) {
+ //logger.debug( "Processing connections for entity: " + conn.getSourceUuid() );
+ connectionWriteQueue.add(conn);
+ connectionsQueued.getAndIncrement();
+ }
+ }
+
+
+ // ----------------------------------------------------------------------------------------
+ // writing data
+
+ /**
+ * Emits entities to be written.
+ */
+ class EntityWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
+ BlockingQueue<ExportEntity> queue;
+
+ public EntityWritesOnSubscribe( BlockingQueue<ExportEntity> queue ) {
+ this.queue = queue;
+ }
+
+ public void call(Subscriber<? super ExportEntity> subscriber) {
+ int count = 0;
+
+ while ( true ) {
+ ExportEntity entity = null;
+ try {
+ //logger.debug( "Wrote {}. Polling for entity to write...", count );
+ activePollers.getAndIncrement();
+ entity = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
+ } catch (InterruptedException e) {
+ logger.error("Entity poll interrupted", e);
+ continue;
+ } finally {
+ activePollers.getAndDecrement();
+ }
+ if ( entity == null ) {
+ break;
+ }
+ subscriber.onNext( entity );
+ count++;
+ }
+
+ logger.info("Done. Wrote {} entities", count);
+ if ( count > 0 ) {
+ subscriber.onCompleted();
+ } else {
+ subscriber.unsubscribe();
+ }
+ }
+ }
+
+ /**
+ * Emits connections to be written.
+ */
+ class ConnectionWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
+ BlockingQueue<ExportConnection> queue;
+
+ public ConnectionWritesOnSubscribe( BlockingQueue<ExportConnection> queue ) {
+ this.queue = queue;
+ }
+
+ public void call(Subscriber<? super ExportConnection> subscriber) {
+ int count = 0;
+
+ while ( true ) {
+ ExportConnection connection = null;
+ try {
+ //logger.debug( "Wrote {}. Polling for connection to write", count );
+ activePollers.getAndIncrement();
+ connection = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
+ } catch (InterruptedException e) {
+ logger.error("Connection poll interrupted", e);
+ continue;
+ } finally {
+ activePollers.getAndDecrement();
+ }
+ if ( connection == null ) {
+ break;
+ }
+ subscriber.onNext( connection );
+ count++;
+ }
+
+ logger.info("Done. Wrote {} connections", count);
+ if ( count > 0 ) {
+ subscriber.onCompleted();
+ } else {
+ subscriber.unsubscribe();
+ }
+ }
+ }
+
+ /**
+ * Writes entities to JSON file.
+ */
+ class EntityWriteAction implements Action1<ExportEntity> {
+
+ public void call(ExportEntity entity) {
+
+ boolean wroteData = false;
+
+ String fileName = "target/" + Thread.currentThread().getName() + ".ude";
+
+ JsonGenerator gen = entityGeneratorsByThread.get( Thread.currentThread() );
+ if ( gen == null ) {
+
+ // no generator so we are opening new file and writing the start of an array
+ try {
+ gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+ } catch (IOException e) {
+ throw new RuntimeException("Error opening output file: " + fileName, e);
+ }
+ gen.setPrettyPrinter( new MinimalPrettyPrinter(""));
+ gen.setCodec( mapper );
+ entityGeneratorsByThread.put( Thread.currentThread(), gen );
+ }
+
+ try {
+ gen.writeObject( entity );
+ gen.writeRaw('\n');
+ entitiesWritten.getAndIncrement();
+ wroteData = true;
+
+ } catch (IOException e) {
+ throw new RuntimeException("Error writing to output file: " + fileName, e);
+ }
+
+ if ( !wroteData ) {
+ emptyFiles.add( fileName );
+ }
+ }
+ }
+
+ /**
+ * Writes connection to JSON file.
+ */
+ class ConnectionWriteAction implements Action1<ExportConnection> {
+
+ public void call(ExportConnection conn) {
+
+ boolean wroteData = false;
+
+ String fileName = "target/" + Thread.currentThread().getName() + ".ugc";
+
+ JsonGenerator gen = connectionGeneratorsByThread.get( Thread.currentThread() );
+ if ( gen == null ) {
+
+ // no generator so we are opening new file and writing the start of an array
+ try {
+ gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+ } catch (IOException e) {
+ throw new RuntimeException("Error opening output file: " + fileName, e);
+ }
+ gen.setPrettyPrinter( new MinimalPrettyPrinter(""));
+ gen.setCodec( mapper );
+ connectionGeneratorsByThread.put( Thread.currentThread(), gen );
+ }
+
+ try {
+ gen.writeObject( conn );
+ gen.writeRaw('\n');
+ connectionsWritten.getAndIncrement();
+ wroteData = true;
+
+ } catch (IOException e) {
+ throw new RuntimeException("Error writing to output file: " + fileName, e);
+ }
+
+ if ( !wroteData ) {
+ emptyFiles.add( fileName );
+ }
+ }
+ }
+
+}
+
+class ExportEntity {
+ private String application;
+ private Entity entity;
+ private Map<String, Object> dictionaries;
+ public ExportEntity( String application, Entity entity, Map<String, Object> dictionaries ) {
+ this.application = application;
+ this.entity = entity;
+ this.dictionaries = dictionaries;
+ }
+
+ public String getApplication() {
+ return application;
+ }
+
+ public void setApplication(String application) {
+ this.application = application;
+ }
+
+ public Entity getEntity() {
+ return entity;
+ }
+
+ public void setEntity(Entity entity) {
+ this.entity = entity;
+ }
+
+ public Map<String, Object> getDictionaries() {
+ return dictionaries;
+ }
+
+ public void setDictionaries(Map<String, Object> dictionaries) {
+ this.dictionaries = dictionaries;
+ }
+}
+
+class ExportConnection {
+ private String application;
+ private String connectionType;
+ private UUID sourceUuid;
+ private UUID targetUuid;
+ public ExportConnection(String application, String connectionType, UUID sourceUuid, UUID targetUuid) {
+ this.application = application;
+ this.connectionType = connectionType;
+ this.sourceUuid = sourceUuid;
+ this.targetUuid = targetUuid;
+ }
+
+ public String getApplication() {
+ return application;
+ }
+
+ public void setApplication(String application) {
+ this.application = application;
+ }
+
+ public String getConnectionType() {
+ return connectionType;
+ }
+
+ public void setConnectionType(String connectionType) {
+ this.connectionType = connectionType;
+ }
+
+ public UUID getSourceUuid() {
+ return sourceUuid;
+ }
+
+ public void setSourceUuid(UUID sourceUuid) {
+ this.sourceUuid = sourceUuid;
+ }
+
+ public UUID getTargetUuid() {
+ return targetUuid;
+ }
+
+ public void setTargetUuid(UUID targetUuid) {
+ this.targetUuid = targetUuid;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2bdbb54/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
new file mode 100644
index 0000000..14b9311
--- /dev/null
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.ServiceITSetup;
+import org.apache.usergrid.ServiceITSetupImpl;
+import org.apache.usergrid.ServiceITSuite;
+import org.apache.usergrid.management.ApplicationInfo;
+import org.apache.usergrid.management.OrganizationOwnerInfo;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.junit.ClassRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+
+public class ExportAppTest {
+ static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
+
+ @ClassRule
+ public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
+
+ @org.junit.Test
+ public void testBasicOperation() throws Exception {
+
+ String rand = RandomStringUtils.randomAlphanumeric( 10 );
+
+ // create app with some data
+
+ OrganizationOwnerInfo orgInfo = setup.getMgmtSvc().createOwnerAndOrganization(
+ "org_" + rand, "user_" + rand, rand.toUpperCase(), rand + "@example.com", rand );
+
+ ApplicationInfo appInfo = setup.getMgmtSvc().createApplication(
+ orgInfo.getOrganization().getUuid(), "app_" + rand );
+
+ EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
+
+ // create 10 connected things
+
+ List<Entity> connectedThings = new ArrayList<Entity>();
+ String connectedType = "connected_thing";
+ em.createApplicationCollection(connectedType);
+ for ( int j=0; j<10; j++) {
+ final String name = "connected_thing_" + j;
+ connectedThings.add( em.create( connectedType, new HashMap<String, Object>() {{
+ put( "name", name );
+ }} ) );
+ }
+
+ // create 10 collections of 10 things, every other thing is connected to the connected things
+
+ for ( int i=0; i<10; i++) {
+ String type = "thing_"+i;
+ em.createApplicationCollection(type);
+ for ( int j=0; j<10; j++) {
+ final String name = "thing_" + j;
+ Entity source = em.create(type, new HashMap<String, Object>() {{ put("name", name); }});
+ if ( j % 2 == 0 ) {
+ for ( Entity target : connectedThings ) {
+ em.createConnection( source, "has", target );
+ }
+ }
+ }
+ }
+
+ // export to file
+
+ String directoryName = "./target/export" + rand;
+
+ ExportApp exportApp = new ExportApp();
+ exportApp.startTool( new String[]{
+ "-application", appInfo.getName(),
+ "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
+ "-outputDir", directoryName
+ }, false );
+
+ }
+}
\ No newline at end of file
[09/10] incubator-usergrid git commit: Added data creator to generate
data for ExportApp testing.
Posted by sf...@apache.org.
Added data creator to generate data for ExportApp testing.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a6e68a0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a6e68a0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a6e68a0a
Branch: refs/heads/master
Commit: a6e68a0adc571b0a30138d4cb71b5e0e49a42c0a
Parents: 2748347
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 21 14:59:59 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 21 14:59:59 2015 -0400
----------------------------------------------------------------------
stack/pom.xml | 2 +-
stack/tools/pom.xml | 6 +
.../org/apache/usergrid/tools/ExportApp.java | 17 +-
.../usergrid/tools/ExportDataCreator.java | 244 +++++++++++++++----
.../org/apache/usergrid/tools/ToolBase.java | 2 +-
.../apache/usergrid/tools/ExportAppTest.java | 65 +----
6 files changed, 223 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index da1b62c..0e1b32d 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -103,7 +103,7 @@
<org.springframework.version>3.1.2.RELEASE</org.springframework.version>
<shiro-version>1.2.0</shiro-version>
<slf4j-version>1.6.1</slf4j-version>
- <snakeyaml-version>1.8</snakeyaml-version>
+ <snakeyaml-version>1.9</snakeyaml-version>
<tomcat-version>7.0.42</tomcat-version>
<antlr.version>3.4</antlr.version>
<tika.version>1.4</tika.version>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/tools/pom.xml
----------------------------------------------------------------------
diff --git a/stack/tools/pom.xml b/stack/tools/pom.xml
index 4be3232..3402612 100644
--- a/stack/tools/pom.xml
+++ b/stack/tools/pom.xml
@@ -249,5 +249,11 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.codearte.jfairy</groupId>
+ <artifactId>jfairy</artifactId>
+ <version>0.4.3</version>
+ </dependency>
+
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index b2da0ea..db975e6 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -102,7 +102,7 @@ public class ExportApp extends ExportingToolBase {
*/
@Override
public void runTool(CommandLine line) throws Exception {
-
+
applicationName = line.getOptionValue( APPLICATION_NAME );
if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
@@ -122,8 +122,11 @@ public class ExportApp extends ExportingToolBase {
logger.info( "Export directory: " + outputDir.getAbsolutePath() );
startSpring();
-
+
UUID applicationId = emf.lookupApplication( applicationName );
+ if (applicationId == null) {
+ throw new RuntimeException( "Cannot find application " + applicationName );
+ }
final EntityManager em = emf.getEntityManager( applicationId );
organizationName = em.getApplication().getOrganizationName();
@@ -133,8 +136,9 @@ public class ExportApp extends ExportingToolBase {
Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
-
+
public Observable<ExportEntity> call(String collection) {
+
return Observable.create( new EntityObservable( em, collection ) )
.doOnNext( new EntityWriteAction() ).subscribeOn( writeScheduler );
}
@@ -142,16 +146,17 @@ public class ExportApp extends ExportingToolBase {
}, writeThreadCount ).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
public Observable<ExportConnection> call(ExportEntity exportEntity) {
+
return Observable.create( new ConnectionsObservable( em, exportEntity ) )
.doOnNext( new ConnectionWriteAction() ).subscribeOn( writeScheduler );
}
- }, writeThreadCount)
+ }, writeThreadCount )
.doOnCompleted( new FileWrapUpAction() )
.toBlocking().last();
}
-
-
+
+
// ----------------------------------------------------------------------------------------
// reading data
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportDataCreator.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportDataCreator.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportDataCreator.java
index a3d6517..6fa4896 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportDataCreator.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportDataCreator.java
@@ -17,90 +17,232 @@
package org.apache.usergrid.tools;
-import java.util.UUID;
-
-import org.apache.usergrid.management.ManagementService;
+import io.codearte.jfairy.Fairy;
+import io.codearte.jfairy.producer.company.Company;
+import io.codearte.jfairy.producer.person.Person;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.management.ApplicationInfo;
+import org.apache.usergrid.management.OrganizationInfo;
import org.apache.usergrid.management.OrganizationOwnerInfo;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.EntityRef;
import org.apache.usergrid.persistence.entities.Activity;
-import org.apache.usergrid.persistence.entities.User;
+import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
-import static org.junit.Assert.assertNotNull;
+import java.util.*;
/**
- * Simple class to create test for for exporting
- *
- * @author tnine
+ * Create an app full of users and data.
*/
-public class ExportDataCreator {
+public class ExportDataCreator extends ToolBase {
+
+ public static final String APP_NAME = "application";
+ public static final String ORG_NAME = "organization";
+ public static final String NUM_USERS = "users";
+ public static final String NUM_COLLECTIONS = "collections";
+ public static final String NUM_ENTITIES = "entities";
+ public static final String ADMIN_USERNAME = "username";
+ public static final String ADMIN_PASSWORD = "password";
- private EntityManagerFactory emf;
+ public String appName = "test-app";
+ public String orgName = "test-organization";
+ public int numUsers = 100;
+ public int numCollections = 20;
+ public int numEntities = 100;
+ public String adminUsername = "adminuser";
+ public String adminPassword = "test";
- private ManagementService managementService;
+ @Override
+ public void runTool(CommandLine line) throws Exception {
- /**
- * @param emf
- * @param managementService
- */
- public ExportDataCreator( EntityManagerFactory emf, ManagementService managementService ) {
- super();
- this.emf = emf;
- this.managementService = managementService;
+ startSpring();
+
+ setVerbose( line );
+
+ if (line.hasOption( APP_NAME )) {
+ appName = line.getOptionValue( APP_NAME );
+ }
+ if (line.hasOption( ORG_NAME )) {
+ orgName = line.getOptionValue( ORG_NAME );
+ }
+ if (line.hasOption( NUM_USERS )) {
+ numUsers = Integer.parseInt( line.getOptionValue( NUM_USERS ) );
+ }
+ if (line.hasOption( NUM_COLLECTIONS )) {
+ numCollections = Integer.parseInt( line.getOptionValue( NUM_COLLECTIONS ) );
+ }
+ if (line.hasOption( NUM_ENTITIES )) {
+ numEntities = Integer.parseInt( line.getOptionValue( NUM_ENTITIES ) );
+ }
+ if (line.hasOption( ADMIN_USERNAME )) {
+ adminUsername = line.getOptionValue( ADMIN_USERNAME );
+ }
+ if (line.hasOption( ADMIN_PASSWORD )) {
+ adminPassword = line.getOptionValue( ADMIN_PASSWORD );
+ }
+
+ createTestData();
}
- public void createTestData() throws Exception {
+ @Override
+ @SuppressWarnings("static-access")
+ public Options createOptions() {
- String orgName = "testexportorg";
+ Options options = super.createOptions();
- //nothing to do
- if ( managementService.getOrganizationByName( orgName ) != null ) {
- return;
- }
+ Option appName = OptionBuilder.hasArg()
+ .withDescription( "Application name to use" ).create( APP_NAME );
- OrganizationOwnerInfo orgInfo = managementService
- .createOwnerAndOrganization( orgName, "textExportUser@apigee.com", "Test User",
- "textExportUser@apigee.com", "password", true, false );
+ Option orgName = OptionBuilder.hasArg()
+ .withDescription( "Organization to use (will create if not present)" ).create( ORG_NAME );
- UUID appId = managementService.createApplication( orgInfo.getOrganization().getUuid(), "application" ).getId();
+ Option numUsers = OptionBuilder.hasArg()
+ .withDescription( "Number of users create (in addition to users)" ).create( NUM_USERS );
- EntityManager em = emf.getEntityManager( appId );
+ Option numCollection = OptionBuilder.hasArg()
+ .withDescription( "Number of collections to create (in addition to users)" ).create( NUM_COLLECTIONS );
- User first = new User();
- first.setUsername( "first" );
- first.setEmail( "first@usergrid.com" );
+ Option numEntities = OptionBuilder.hasArg()
+ .withDescription( "Number of entities to create per collection" ).create( NUM_ENTITIES );
- Entity firstUserEntity = em.create( first );
+ Option adminUsername = OptionBuilder.hasArg()
+ .withDescription( "Admin Username" ).create( ADMIN_USERNAME );
- assertNotNull( firstUserEntity );
+ Option adminPassword = OptionBuilder.hasArg()
+ .withDescription( "Admin Password" ).create( ADMIN_PASSWORD );
- User second = new User();
- second.setUsername( "second" );
- second.setEmail( "second@usergrid.com" );
+ options.addOption( appName );
+ options.addOption( orgName );
+ options.addOption( numUsers );
+ options.addOption( numCollection );
+ options.addOption( numEntities );
+ options.addOption( adminUsername );
+ options.addOption( adminPassword );
- Entity secondUserEntity = em.create( second );
+ return options;
+ }
- assertNotNull( secondUserEntity );
- em.createConnection( firstUserEntity, "likes", secondUserEntity );
+ public void createTestData() throws Exception {
- em.createConnection( secondUserEntity, "dislikes", firstUserEntity );
+ OrganizationInfo orgInfo = managementService.getOrganizationByName( orgName );
- // now create some activities and put them into the user stream
+ if (orgInfo == null) {
+ OrganizationOwnerInfo ownerInfo = managementService.createOwnerAndOrganization(
+ orgName, adminUsername + "@example.com", adminUsername,
+ adminUsername + "@example.com", adminPassword, true, false );
+ orgInfo = ownerInfo.getOrganization();
+ }
- Activity activity = new Activity();
+ ApplicationInfo appInfo = managementService.getApplicationInfo( orgName + "/" + appName );
- Activity.ActivityObject actor = new Activity.ActivityObject();
- actor.setEntityType( "user" );
- actor.setId( firstUserEntity.getUuid().toString() );
+ if (appInfo == null) {
+ UUID appId = managementService.createApplication( orgInfo.getUuid(), appName ).getId();
+ appInfo = managementService.getApplicationInfo( appId );
+ }
- activity.setActor( actor );
- activity.setVerb( "POST" );
+ EntityManager em = emf.getEntityManager( appInfo.getId() );
+
+ Fairy fairy = Fairy.create();
+
+ List<Entity> users = new ArrayList<Entity>( numUsers );
+
+ for (int i = 0; i < numUsers; i++) {
+
+ final Person person = fairy.person();
+ Entity userEntity = null;
+ try {
+ final Map<String, Object> userMap = new HashMap<String, Object>() {{
+ put( "username", person.username() );
+ put( "password", person.password() );
+ put( "email", person.email() );
+ put( "companyEmail", person.companyEmail() );
+ put( "dateOfBirth", person.dateOfBirth() );
+ put( "firstName", person.firstName() );
+ put( "lastName", person.lastName() );
+ put( "nationalIdentificationNumber", person.nationalIdentificationNumber() );
+ put( "telephoneNumber", person.telephoneNumber() );
+ put( "passportNumber", person.passportNumber() );
+ put( "address", person.getAddress() );
+ }};
+
+ userEntity = em.create( "user", userMap );
+ users.add( userEntity );
+
+ } catch (DuplicateUniquePropertyExistsException e) {
+ logger.error( "Dup user generated: " + person.username() );
+ continue;
+ } catch (Exception e) {
+ logger.error("Error creating user", e);
+ continue;
+ }
+
+ final Company company = person.getCompany();
+ try {
+ EntityRef ref = em.getAlias( "company", company.name() );
+ Entity companyEntity = (ref == null) ? null : em.get( ref );
+
+ // create company if it does not exist yet
+ if ( companyEntity == null ) {
+ final Map<String, Object> companyMap = new HashMap<String, Object>() {{
+ put( "name", company.name() );
+ put( "domain", company.domain() );
+ put( "email", company.email() );
+ put( "url", company.url() );
+ put( "vatIdentificationNumber", company.vatIdentificationNumber() );
+ }};
+ companyEntity = em.create( "company", companyMap );
+ } else {
+ logger.info("Company {} already exists", company.name());
+ }
+
+ em.createConnection( userEntity, "employer", companyEntity );
+
+ } catch (DuplicateUniquePropertyExistsException e) {
+ logger.error( "Dup company generated {} property={}", company.name(), e.getPropertyName() );
+ continue;
+ } catch (Exception e) {
+ logger.error("Error creating or connecting company", e);
+ continue;
+ }
+
+ try {
+ for (int j = 0; j < 5; j++) {
+ Activity activity = new Activity();
+ Activity.ActivityObject actor = new Activity.ActivityObject();
+ actor.setEntityType( "user" );
+ actor.setId( userEntity.getUuid().toString() );
+ activity.setActor( actor );
+ activity.setVerb( "POST" );
+ activity.setContent( "User " + person.username() + " generated a random string "
+ + RandomStringUtils.randomAlphanumeric( 5 ) );
+ em.createItemInCollection( userEntity, "activities", "activity", activity.getProperties() );
+ }
+
+ if (users.size() > 10) {
+ for (int j = 0; j < 5; j++) {
+ try {
+ em.createConnection( userEntity, "associate", users.get( (int) (Math.random() * users.size()) ) );
+ } catch (Exception e) {
+ logger.error( "Error connecting user to user: " + e.getMessage() );
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ logger.error("Error creating activities", e);
+ continue;
+ }
- em.createItemInCollection( firstUserEntity, "activities", "activity", activity.getProperties() );
+ }
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
index 63187a4..3c427e1 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
@@ -116,7 +116,7 @@ public abstract class ToolBase {
public void printCliHelp( String message ) {
System.out.println( message );
HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp( "java -jar usergrid-tools-0.0.1-SNAPSHOT.jar " + getToolName(), createOptions() );
+ formatter.printHelp( "java -jar usergrid-tools-1.0.2.jar " + getToolName(), createOptions() );
System.exit( -1 );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index a1e3f6b..446aa91 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -63,65 +63,23 @@ public class ExportAppTest {
// create app with some data
- OrganizationOwnerInfo orgInfo = setup.getMgmtSvc().createOwnerAndOrganization(
- "org_" + rand, "user_" + rand, rand.toUpperCase(), rand + "@example.com", rand );
-
- ApplicationInfo appInfo = setup.getMgmtSvc().createApplication(
- orgInfo.getOrganization().getUuid(), "app_" + rand );
-
- final EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
+ String orgName = "org_" + rand;
+ String appName = "app_" + rand;
+
+ ExportDataCreator creator = new ExportDataCreator();
+ creator.startTool( new String[] {
+ "-organization", orgName,
+ "-application", appName,
+ "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort()
+ }, false);
- // create connected things
-
- final List<Entity> connectedThings = new ArrayList<Entity>();
- String connectedType = "connected_thing";
- em.createApplicationCollection(connectedType);
- for ( int j=0; j<NUM_CONNECTIONS; j++) {
- final String name = "connected_thing_" + j;
- connectedThings.add( em.create( connectedType, new HashMap<String, Object>() {{
- put( "name", name );
- }} ) );
- }
-
- // create collections of things, every other thing is connected to the connected things
-
- final AtomicInteger entitiesCount = new AtomicInteger(0);
- final AtomicInteger connectionCount = new AtomicInteger(0);
-
- ExecutorService execService = Executors.newFixedThreadPool( 50);
- final Scheduler scheduler = Schedulers.from( execService );
-
- for (int i = 0; i < NUM_COLLECTIONS; i++) {
-
- final String type = "thing_" + i;
- em.createApplicationCollection( type );
- connectionCount.getAndIncrement();
-
- for (int j = 0; j < NUM_ENTITIES; j++) {
- final String name = "thing_" + j;
- final Entity source = em.create(
- type, new HashMap<String, Object>() {{
- put( "name", name );
- }} );
- entitiesCount.getAndIncrement();
-
- for (Entity target : connectedThings) {
- em.createConnection( source, "has", target );
- connectionCount.getAndIncrement();
- }
- }
- }
-
- logger.info( "Done. Created {} entities and {} connections", entitiesCount.get(), connectionCount.get() );
-
long start = System.currentTimeMillis();
String directoryName = "target/export" + rand;
ExportApp exportApp = new ExportApp();
exportApp.startTool( new String[]{
- "-application", appInfo.getName(),
- "-readThreads", "100",
+ "-application", orgName + "/" + appName,
"-writeThreads", "100",
"-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
"-outputDir", directoryName
@@ -137,8 +95,7 @@ public class ExportAppTest {
File exportDir1 = new File(directoryName + "1");
exportApp.startTool( new String[]{
- "-application", appInfo.getName(),
- "-readThreads", "1",
+ "-application", orgName + "/" + appName,
"-writeThreads", "1",
"-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
"-outputDir", directoryName + "1"
[04/10] incubator-usergrid git commit: Some reformatting. Also
eliminating use of subscriber.unsubscribe(). All observables need to wrap up
with onCompleted().
Posted by sf...@apache.org.
Some reformatting. Also eliminating use of subscriber.unsubscribe(). All observables need to wrap up with onCompleted().
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a25f8ebc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a25f8ebc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a25f8ebc
Branch: refs/heads/master
Commit: a25f8ebc2877681897a5ef945d39b685c2d3f9fa
Parents: 7a870d6
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 15:31:07 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 15:31:07 2015 -0400
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ExportApp.java | 113 ++++++++++---------
stack/tools/src/main/resources/log4j.properties | 5 +-
.../apache/usergrid/tools/ExportAppTest.java | 89 ++++-----------
3 files changed, 81 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a25f8ebc/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index 59509c0..c302a74 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -16,7 +16,6 @@
*/
package org.apache.usergrid.tools;
-
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
@@ -82,8 +81,30 @@ public class ExportApp extends ExportingToolBase {
// set via CLI
int readThreadCount = 80;
int writeThreadCount = 10; // limiting write will limit output files
-
+
+ @Override
+ @SuppressWarnings("static-access")
+ public Options createOptions() {
+
+ Options options = super.createOptions();
+
+ Option appNameOption = OptionBuilder.hasArg().withType("")
+ .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
+ options.addOption( appNameOption );
+
+ Option readThreadsOption = OptionBuilder.hasArg().withType(0)
+ .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
+ options.addOption( readThreadsOption );
+
+ Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
+ .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
+ options.addOption( writeThreadsOption );
+
+ return options;
+ }
+
+
/**
* Tool entry point.
*/
@@ -110,25 +131,25 @@ public class ExportApp extends ExportingToolBase {
}
}
- ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
- readScheduler = Schedulers.from( readThreadPoolExecutor );
-
- ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
- writeScheduler = Schedulers.from( writeThreadPoolExecutor );
-
- startSpring();
-
setVerbose( line );
applyOrgId( line );
prepareBaseOutputFileName( line );
outputDir = createOutputParentDir();
logger.info( "Export directory: " + outputDir.getAbsolutePath() );
-
+
+ startSpring();
+
UUID applicationId = emf.lookupApplication( applicationName );
final EntityManager em = emf.getEntityManager( applicationId );
organizationName = em.getApplication().getOrganizationName();
+ ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
+ readScheduler = Schedulers.from( readThreadPoolExecutor );
+
+ ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
+ writeScheduler = Schedulers.from( writeThreadPoolExecutor );
+
Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
@@ -151,30 +172,11 @@ public class ExportApp extends ExportingToolBase {
.toBlocking().last();
}
- @Override
- @SuppressWarnings("static-access")
- public Options createOptions() {
-
- Options options = super.createOptions();
-
- Option appNameOption = OptionBuilder.hasArg().withType("")
- .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
- options.addOption( appNameOption );
-
- Option readThreadsOption = OptionBuilder.hasArg().withType(0)
- .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
- options.addOption( readThreadsOption );
-
- Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
- .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
- options.addOption( writeThreadsOption );
-
- return options;
- }
// ----------------------------------------------------------------------------------------
// reading data
+
/**
* Emits collection names found in application.
*/
@@ -198,16 +200,13 @@ public class ExportApp extends ExportingToolBase {
} catch (Exception e) {
subscriber.onError( e );
}
- if ( count > 0 ) {
- subscriber.onCompleted();
- logger.info( "Completed. Read {} collection names", count );
- } else {
- subscriber.unsubscribe();
- logger.info( "No collections found" );
- }
+
+ subscriber.onCompleted();
+ logger.info( "Completed. Read {} collection names", count );
}
}
+
/**
* Emits entities of collection.
*/
@@ -267,13 +266,8 @@ public class ExportApp extends ExportingToolBase {
results = em.searchCollection( em.getApplicationRef(), collection, query );
}
- if ( count > 0 ) {
- subscriber.onCompleted();
- logger.info("Completed collection {}. Read {} entities", collection, count);
- } else {
- logger.info("Completed collection {} empty", collection );
- subscriber.unsubscribe();
- }
+ subscriber.onCompleted();
+ logger.info("Completed collection {}. Read {} entities", collection, count);
} catch ( Exception e ) {
subscriber.onError(e);
@@ -281,6 +275,7 @@ public class ExportApp extends ExportingToolBase {
}
}
+
/**
* Emits connections of an entity.
*/
@@ -331,19 +326,17 @@ public class ExportApp extends ExportingToolBase {
subscriber.onError( e );
}
- if ( count > 0 ) {
- subscriber.onCompleted();
- logger.info("Completed entity {} type {} connections count {}",
- new Object[] { exportEntity.getEntity().getName(), exportEntity.getEntity().getType(), count });
-
- } else {
- subscriber.unsubscribe();
- logger.info( "Entity {} type {} has no connections",
- exportEntity.getEntity().getName(), exportEntity.getEntity().getType() );
- }
+ subscriber.onCompleted();
+ logger.info("Completed entity {} type {} connections count {}",
+ new Object[] { exportEntity.getEntity().getName(), exportEntity.getEntity().getType(), count });
}
}
+
+ // ----------------------------------------------------------------------------------------
+ // writing data
+
+
/**
* Writes entities to JSON file.
*/
@@ -381,6 +374,7 @@ public class ExportApp extends ExportingToolBase {
}
}
+
/**
* Writes connection to JSON file.
*/
@@ -418,6 +412,7 @@ public class ExportApp extends ExportingToolBase {
}
}
+
private class FileWrapUpAction implements Action0 {
@Override
public void call() {
@@ -448,6 +443,10 @@ public class ExportApp extends ExportingToolBase {
}
}
+
+/**
+ * Represents entity data to be serialized to JSON.
+ */
class ExportEntity {
private String organization;
private String application;
@@ -493,6 +492,10 @@ class ExportEntity {
}
}
+
+/**
+ * Represents connection data to be serialized to JSON.
+ */
class ExportConnection {
private String organization;
private String application;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a25f8ebc/stack/tools/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/resources/log4j.properties b/stack/tools/src/main/resources/log4j.properties
index 6cf0a92..def47b4 100644
--- a/stack/tools/src/main/resources/log4j.properties
+++ b/stack/tools/src/main/resources/log4j.properties
@@ -18,7 +18,7 @@
# and the pattern to %c instead of %l. (%l is slower.)
# output messages into a rolling log file as well as stdout
-log4j.rootLogger=WARN,stdout
+log4j.rootLogger=ERROR,stdout
# stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
@@ -26,7 +26,8 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) [%c] - %m%n
-log4j.logger.org.apache.usergrid.tools=INFO
+log4j.logger.org.apache.usergrid=INFO
+log4j.logger.org.apache.usergrid.tools=DEBUG
log4j.logger.org.apache.usergrid.management.cassandra=WARN
log4j.logger.org.apache.usergrid.persistence.cassandra.DB=WARN
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a25f8ebc/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index af8306f..d1c5c1f 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -27,10 +27,7 @@ import org.apache.usergrid.persistence.EntityManager;
import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import rx.Observable;
import rx.Scheduler;
-import rx.functions.Action1;
-import rx.functions.Func1;
import rx.schedulers.Schedulers;
import java.util.ArrayList;
@@ -44,9 +41,9 @@ import java.util.concurrent.atomic.AtomicInteger;
public class ExportAppTest {
static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
- int NUM_COLLECTIONS = 5;
- int NUM_ENTITIES = 10;
- int NUM_CONNECTIONS = 1;
+ int NUM_COLLECTIONS = 20;
+ int NUM_ENTITIES = 200;
+ int NUM_CONNECTIONS = 5;
@ClassRule
public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
@@ -86,68 +83,25 @@ public class ExportAppTest {
ExecutorService execService = Executors.newFixedThreadPool( 50);
final Scheduler scheduler = Schedulers.from( execService );
- Observable.range( 0, NUM_COLLECTIONS ).flatMap( new Func1<Integer, Observable<?>>() {
- @Override
- public Observable<?> call(Integer i) {
-
- return Observable.just( i ).doOnNext( new Action1<Integer>() {
- @Override
- public void call(Integer i) {
-
- final String type = "thing_"+i;
- try {
- em.createApplicationCollection( type );
- connectionCount.getAndIncrement();
-
- } catch (Exception e) {
- throw new RuntimeException( "Error creating collection", e );
- }
-
- Observable.range( 0, NUM_ENTITIES ).flatMap( new Func1<Integer, Observable<?>>() {
- @Override
- public Observable<?> call(Integer j) {
- return Observable.just( j ).doOnNext( new Action1<Integer>() {
- @Override
- public void call(Integer j) {
-
- final String name = "thing_" + j;
- try {
- final Entity source = em.create(
- type, new HashMap<String, Object>() {{ put("name", name); }});
- entitiesCount.getAndIncrement();
- logger.info( "Created entity {} type {}", name, type );
-
- for ( Entity target : connectedThings ) {
- em.createConnection( source, "has", target );
- connectionCount.getAndIncrement();
- logger.info( "Created connection from entity {} type {} to {}",
- new Object[]{name, type, target.getName()} );
- }
-
-
- } catch (Exception e) {
- throw new RuntimeException( "Error creating collection", e );
- }
-
-
- }
-
- } );
-
- }
- }, 50 ).subscribeOn( scheduler ).subscribe(); // toBlocking().last();
-
- }
- } );
-
+ for (int i = 0; i < NUM_COLLECTIONS; i++) {
- }
- }, 30 ).subscribeOn( scheduler ).toBlocking().last();
+ final String type = "thing_" + i;
+ em.createApplicationCollection( type );
+ connectionCount.getAndIncrement();
+
+ for (int j = 0; j < NUM_ENTITIES; j++) {
+ final String name = "thing_" + j;
+ final Entity source = em.create(
+ type, new HashMap<String, Object>() {{
+ put( "name", name );
+ }} );
+ entitiesCount.getAndIncrement();
- while ( entitiesCount.get() < NUM_COLLECTIONS * NUM_ENTITIES ) {
- Thread.sleep( 5000 );
- logger.info( "Still working. Created {} entities and {} connections",
- entitiesCount.get(), connectionCount.get() );
+ for (Entity target : connectedThings) {
+ em.createConnection( source, "has", target );
+ connectionCount.getAndIncrement();
+ }
+ }
}
logger.info( "Done. Created {} entities and {} connections", entitiesCount.get(), connectionCount.get() );
@@ -166,8 +120,5 @@ public class ExportAppTest {
}, false );
logger.info("time = " + (System.currentTimeMillis() - start)/1000 + "s");
-
-
-
}
}
\ No newline at end of file
[07/10] incubator-usergrid git commit: Make flatmap max observables
match write thread count and use Schedulers.io() instead of a custom
readScheduler.
Posted by sf...@apache.org.
Make flatmap max observables match write thread count and use Schedulers.io() instead of a custom readScheduler.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b58390d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b58390d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b58390d9
Branch: refs/heads/master
Commit: b58390d96eb423287247774533b18e9c4ee43843
Parents: dab84e9
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Jul 15 13:53:55 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Jul 15 13:53:55 2015 -0400
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ExportApp.java | 39 +++++---------------
.../apache/usergrid/tools/ExportAppTest.java | 12 +++---
2 files changed, 14 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b58390d9/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index c302a74..b2da0ea 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -54,7 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*
* Will create as many output files as there are writeThreads (by default: 10).
*
- * Will create two types of files: *.uge for Usegrird entities and *.ugc for entity to entity connections.
+ * Will create two types of files: *.entities for Usegrird entities and *.collections for entity to entity connections.
*
* Every line of the data files is a complete JSON object.
*/
@@ -62,7 +62,6 @@ public class ExportApp extends ExportingToolBase {
static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
static final String APPLICATION_NAME = "application";
- private static final String READ_THREAD_COUNT = "readThreads";
private static final String WRITE_THREAD_COUNT = "writeThreads";
String applicationName;
@@ -71,16 +70,13 @@ public class ExportApp extends ExportingToolBase {
AtomicInteger entitiesWritten = new AtomicInteger(0);
AtomicInteger connectionsWritten = new AtomicInteger(0);
- Scheduler readScheduler;
Scheduler writeScheduler;
ObjectMapper mapper = new ObjectMapper();
Map<Thread, JsonGenerator> entityGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
- // set via CLI
- int readThreadCount = 80;
- int writeThreadCount = 10; // limiting write will limit output files
+ int writeThreadCount = 10; // set via CLI option; limiting write will limit output files
@Override
@@ -93,10 +89,6 @@ public class ExportApp extends ExportingToolBase {
.withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
options.addOption( appNameOption );
- Option readThreadsOption = OptionBuilder.hasArg().withType(0)
- .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
- options.addOption( readThreadsOption );
-
Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
.withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
options.addOption( writeThreadsOption );
@@ -113,15 +105,6 @@ public class ExportApp extends ExportingToolBase {
applicationName = line.getOptionValue( APPLICATION_NAME );
- if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
- try {
- readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
- } catch (NumberFormatException nfe) {
- logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
- return;
- }
- }
-
if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
try {
writeThreadCount = Integer.parseInt( line.getOptionValue( WRITE_THREAD_COUNT ) );
@@ -144,9 +127,6 @@ public class ExportApp extends ExportingToolBase {
final EntityManager em = emf.getEntityManager( applicationId );
organizationName = em.getApplication().getOrganizationName();
- ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
- readScheduler = Schedulers.from( readThreadPoolExecutor );
-
ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
writeScheduler = Schedulers.from( writeThreadPoolExecutor );
@@ -155,19 +135,18 @@ public class ExportApp extends ExportingToolBase {
collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
public Observable<ExportEntity> call(String collection) {
- return Observable.create( new EntityObservable( em, collection ))
+ return Observable.create( new EntityObservable( em, collection ) )
.doOnNext( new EntityWriteAction() ).subscribeOn( writeScheduler );
}
-
- }, 10).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
-
+
+ }, writeThreadCount ).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
+
public Observable<ExportConnection> call(ExportEntity exportEntity) {
- return Observable.create( new ConnectionsObservable( em, exportEntity ))
+ return Observable.create( new ConnectionsObservable( em, exportEntity ) )
.doOnNext( new ConnectionWriteAction() ).subscribeOn( writeScheduler );
}
-
- }, 10)
- .subscribeOn( readScheduler )
+
+ }, writeThreadCount)
.doOnCompleted( new FileWrapUpAction() )
.toBlocking().last();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b58390d9/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index c5411fd..a1e3f6b 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -24,7 +24,6 @@ import org.apache.usergrid.management.ApplicationInfo;
import org.apache.usergrid.management.OrganizationOwnerInfo;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
-import org.junit.Assert;
import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -132,9 +131,9 @@ public class ExportAppTest {
File exportDir = new File(directoryName);
assertTrue( getFileCount( exportDir, "entities" ) > 0 );
- assertTrue( getFileCount( exportDir, "collections" ) > 0 );
- assertTrue( getFileCount( exportDir, "entities" ) >= 100 );
- assertTrue( getFileCount( exportDir, "collections" ) >= 100 );
+ assertTrue( getFileCount( exportDir, "connections" ) > 0 );
+ assertTrue( getFileCount( exportDir, "entities" ) <= 100 );
+ assertTrue( getFileCount( exportDir, "connections" ) <= 100 );
File exportDir1 = new File(directoryName + "1");
exportApp.startTool( new String[]{
@@ -147,9 +146,8 @@ public class ExportAppTest {
logger.info( "1 thread time = " + (System.currentTimeMillis() - start) / 1000 + "s" );
- exportDir = new File(directoryName);
- assertEquals( 1, getFileCount( exportDir, "entities" ));
- assertEquals( 1, getFileCount( exportDir, "collections" ));
+ assertEquals( 1, getFileCount( exportDir1, "entities" ));
+ assertEquals( 1, getFileCount( exportDir1, "connections" ));
}
private static int getFileCount(File exportDir, final String ext ) {
[03/10] incubator-usergrid git commit: Remove queues and make the
whole thing one "stream"
Posted by sf...@apache.org.
Remove queues and make the whole thing one "stream"
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7a870d69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7a870d69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7a870d69
Branch: refs/heads/master
Commit: 7a870d6929cea76f5bbec9aa8f5a8caa8dee07e4
Parents: 2b65e61
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 13:31:18 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 13:31:18 2015 -0400
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ExportApp.java | 375 +++++--------------
.../usergrid/tools/ExportingToolBase.java | 2 +-
.../apache/usergrid/tools/ExportAppTest.java | 114 +++++-
3 files changed, 185 insertions(+), 306 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a870d69/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index 21c63a0..59509c0 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
+import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
@@ -42,7 +43,8 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
@@ -59,40 +61,29 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class ExportApp extends ExportingToolBase {
static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
-
- private static final String READ_THREAD_COUNT = "readThreads";
- private static final String WRITE_THREAD_COUNT = "writeThreads";
-
- // we will write two types of files: entities and connections
- BlockingQueue<ExportEntity> entityWriteQueue = new LinkedBlockingQueue<ExportEntity>();
- BlockingQueue<ExportConnection> connectionWriteQueue = new LinkedBlockingQueue<ExportConnection>();
static final String APPLICATION_NAME = "application";
-
- int pollTimeoutSeconds = 10;
-
- int readThreadCount = 80;
- int writeThreadCount = 10;
-
+ private static final String READ_THREAD_COUNT = "readThreads";
+ private static final String WRITE_THREAD_COUNT = "writeThreads";
+
String applicationName;
String organizationName;
- // limiting output threads will limit output files
+ AtomicInteger entitiesWritten = new AtomicInteger(0);
+ AtomicInteger connectionsWritten = new AtomicInteger(0);
+
Scheduler readScheduler;
+ Scheduler writeScheduler;
+ ObjectMapper mapper = new ObjectMapper();
Map<Thread, JsonGenerator> entityGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
- List<String> emptyFiles = new ArrayList<String>();
+ // set via CLI
+ int readThreadCount = 80;
+ int writeThreadCount = 10; // limiting write will limit output files
- AtomicInteger activePollers = new AtomicInteger(0);
- AtomicInteger entitiesQueued = new AtomicInteger(0);
- AtomicInteger entitiesWritten = new AtomicInteger(0);
- AtomicInteger connectionsWritten = new AtomicInteger(0);
- AtomicInteger connectionsQueued = new AtomicInteger(0);
- ObjectMapper mapper = new ObjectMapper();
-
/**
* Tool entry point.
*/
@@ -123,7 +114,7 @@ public class ExportApp extends ExportingToolBase {
readScheduler = Schedulers.from( readThreadPoolExecutor );
ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
- final Scheduler writeScheduler = Schedulers.from( writeThreadPoolExecutor );
+ writeScheduler = Schedulers.from( writeThreadPoolExecutor );
startSpring();
@@ -138,78 +129,26 @@ public class ExportApp extends ExportingToolBase {
final EntityManager em = emf.getEntityManager( applicationId );
organizationName = em.getApplication().getOrganizationName();
- // start write queue workers
-
- EntityWritesOnSubscribe entityWritesOnSub = new EntityWritesOnSubscribe( entityWriteQueue );
- rx.Observable entityWritesObservable = rx.Observable.create( entityWritesOnSub );
- entityWritesObservable.flatMap( new Func1<ExportEntity, Observable<?>>() {
- public Observable<ExportEntity> call(ExportEntity exportEntity) {
- return Observable.just(exportEntity).doOnNext(
- new EntityWriteAction() ).subscribeOn( writeScheduler );
- }
- },10).subscribeOn( writeScheduler ).subscribe();
-
- ConnectionWritesOnSubscribe connectionWritesOnSub = new ConnectionWritesOnSubscribe( connectionWriteQueue );
- rx.Observable connectionWritesObservable = rx.Observable.create( connectionWritesOnSub );
- connectionWritesObservable.flatMap( new Func1<ExportConnection, Observable<?>>() {
- public Observable<ExportConnection> call(ExportConnection connection ) {
- return Observable.just(connection).doOnNext(
- new ConnectionWriteAction()).subscribeOn( writeScheduler );
+ Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
+
+ collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
+
+ public Observable<ExportEntity> call(String collection) {
+ return Observable.create( new EntityObservable( em, collection ))
+ .doOnNext( new EntityWriteAction() ).subscribeOn( writeScheduler );
}
- },10).subscribeOn( writeScheduler ).subscribe();
-
- // start processing data and filling up write queues
-
- CollectionsOnSubscribe onSubscribe = new CollectionsOnSubscribe( em );
- rx.Observable collectionsObservable = rx.Observable.create( onSubscribe );
- collectionsObservable.flatMap( new Func1<String, Observable<String>>() {
- public Observable<String> call(String collection) {
- return Observable.just(collection).doOnNext(
- new CollectionAction( em ) ).subscribeOn( readScheduler );
+
+ }, 10).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
+
+ public Observable<ExportConnection> call(ExportEntity exportEntity) {
+ return Observable.create( new ConnectionsObservable( em, exportEntity ))
+ .doOnNext( new ConnectionWriteAction() ).subscribeOn( writeScheduler );
}
- },40).subscribeOn( readScheduler ).subscribe();
-
- // wait for write thread pollers to get started
-
- try { Thread.sleep( 1000 ); } catch (InterruptedException ignored) {}
-
- // wait for write-thread pollers to stop
-
- while ( activePollers.get() > 0 ) {
- logger.info(
- "Active write threads: {}\n"
- +"Entities written: {}\n"
- +"Entities queued: {}\n"
- +"Connections written: {}\n"
- +"Connections queued: {}\n",
- new Object[] {
- activePollers.get(),
- entitiesWritten.get(),
- entitiesQueued.get(),
- connectionsWritten.get(),
- connectionsQueued.get()} );
- try { Thread.sleep( 5000 ); } catch (InterruptedException ignored) {}
- }
-
- // wrap up files
-
- for ( JsonGenerator gen : entityGeneratorsByThread.values() ) {
- //gen.writeEndArray();
- gen.flush();
- gen.close();
- }
-
- for ( JsonGenerator gen : connectionGeneratorsByThread.values() ) {
- //gen.writeEndArray();
- gen.flush();
- gen.close();
- }
-
- for ( String fileName : emptyFiles ) {
- File emptyFile = new File(fileName);
- emptyFile.deleteOnExit();
- }
-
+
+ }, 10)
+ .subscribeOn( readScheduler )
+ .doOnCompleted( new FileWrapUpAction() )
+ .toBlocking().last();
}
@Override
@@ -222,12 +161,12 @@ public class ExportApp extends ExportingToolBase {
.withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
options.addOption( appNameOption );
- Option readThreadsOption = OptionBuilder
- .hasArg().withType(0).withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
+ Option readThreadsOption = OptionBuilder.hasArg().withType(0)
+ .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
options.addOption( readThreadsOption );
- Option writeThreadsOption = OptionBuilder
- .hasArg().withType(0).withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
+ Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
+ .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
options.addOption( writeThreadsOption );
return options;
@@ -239,17 +178,15 @@ public class ExportApp extends ExportingToolBase {
/**
* Emits collection names found in application.
*/
- class CollectionsOnSubscribe implements rx.Observable.OnSubscribe<String> {
+ class CollectionsObservable implements rx.Observable.OnSubscribe<String> {
EntityManager em;
- public CollectionsOnSubscribe( EntityManager em ) {
+ public CollectionsObservable(EntityManager em) {
this.em = em;
}
public void call(Subscriber<? super String> subscriber) {
- logger.info("Starting to read collections");
-
int count = 0;
try {
Map<String, Object> collectionMetadata = em.getApplicationCollectionMetadata();
@@ -261,11 +198,12 @@ public class ExportApp extends ExportingToolBase {
} catch (Exception e) {
subscriber.onError( e );
}
- logger.info("Done. Read {} collection names", count);
if ( count > 0 ) {
subscriber.onCompleted();
+ logger.info( "Completed. Read {} collection names", count );
} else {
subscriber.unsubscribe();
+ logger.info( "No collections found" );
}
}
}
@@ -273,11 +211,11 @@ public class ExportApp extends ExportingToolBase {
/**
* Emits entities of collection.
*/
- class EntityOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
+ class EntityObservable implements rx.Observable.OnSubscribe<ExportEntity> {
EntityManager em;
String collection;
- public EntityOnSubscribe(EntityManager em, String collection) {
+ public EntityObservable(EntityManager em, String collection) {
this.em = em;
this.collection = collection;
}
@@ -286,6 +224,8 @@ public class ExportApp extends ExportingToolBase {
logger.info("Starting to read entities of collection {}", collection);
+ subscriber.onStart();
+
try {
int count = 0;
@@ -327,10 +267,11 @@ public class ExportApp extends ExportingToolBase {
results = em.searchCollection( em.getApplicationRef(), collection, query );
}
- logger.info("Done. Read {} entities", count);
if ( count > 0 ) {
subscriber.onCompleted();
+ logger.info("Completed collection {}. Read {} entities", collection, count);
} else {
+ logger.info("Completed collection {} empty", collection );
subscriber.unsubscribe();
}
@@ -343,18 +284,19 @@ public class ExportApp extends ExportingToolBase {
/**
* Emits connections of an entity.
*/
- class ConnectionsOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
+ class ConnectionsObservable implements rx.Observable.OnSubscribe<ExportConnection> {
EntityManager em;
ExportEntity exportEntity;
- public ConnectionsOnSubscribe(EntityManager em, ExportEntity exportEntity) {
+ public ConnectionsObservable(EntityManager em, ExportEntity exportEntity) {
this.em = em;
this.exportEntity = exportEntity;
}
public void call(Subscriber<? super ExportConnection> subscriber) {
- logger.info("Starting to read connections for entity type {}", exportEntity.getEntity().getType());
+ logger.info( "Starting to read connections for entity {} type {}",
+ exportEntity.getEntity().getName(), exportEntity.getEntity().getType() );
int count = 0;
@@ -388,173 +330,16 @@ public class ExportApp extends ExportingToolBase {
} catch (Exception e) {
subscriber.onError( e );
}
-
- logger.info("Done. Read {} connections", count);
- if ( count > 0 ) {
- subscriber.onCompleted();
- } else {
- subscriber.unsubscribe();
- }
- }
- }
-
- /**
- * Process collection by starting processing of its entities.
- */
- class CollectionAction implements Action1<String> {
- EntityManager em;
-
- public CollectionAction( EntityManager em ) {
- this.em = em;
- }
-
- public void call(String collection) {
-
- // process entities of collection in parallel
- EntityOnSubscribe onSubscribe = new EntityOnSubscribe( em, collection );
- rx.Observable entityObservable = rx.Observable.create( onSubscribe );
- entityObservable.flatMap( new Func1<ExportEntity, Observable<ExportEntity>>() {
- public Observable<ExportEntity> call(ExportEntity exportEntity) {
- return Observable.just(exportEntity).doOnNext(
- new EntityAction( em ) ).subscribeOn( readScheduler );
- }
- }, 8).subscribeOn(readScheduler).toBlocking().last();
- }
- }
-
- /**
- * Process entity by adding it to entityWriteQueue and starting processing of its connections.
- */
- class EntityAction implements Action1<ExportEntity> {
- EntityManager em;
-
- public EntityAction( EntityManager em ) {
- this.em = em;
- }
-
- public void call(ExportEntity exportEntity) {
- //logger.debug( "Processing entity: " + exportEntity.getEntity().getUuid() );
-
- entityWriteQueue.add( exportEntity );
- entitiesQueued.getAndIncrement();
-
- // if entity has connections, process them in parallel
- try {
- Results connectedEntities = em.getConnectedEntities(
- exportEntity.getEntity().getUuid(), null, null, Results.Level.CORE_PROPERTIES );
-
- if ( !connectedEntities.isEmpty() ) {
- ConnectionsOnSubscribe onSubscribe = new ConnectionsOnSubscribe( em, exportEntity );
- rx.Observable entityObservable = rx.Observable.create( onSubscribe );
-
- entityObservable.flatMap( new Func1<ExportConnection, Observable<ExportConnection>>() {
- public Observable<ExportConnection> call(ExportConnection connection) {
- return Observable.just(connection).doOnNext(
- new ConnectionsAction() ).subscribeOn(readScheduler);
- }
- }, 8).subscribeOn(readScheduler).toBlocking().last();
- }
-
- } catch (Exception e) {
- throw new RuntimeException( "Error getting connections", e );
- }
- }
- }
-
- /**
- * Process connection by adding it to connectionWriteQueue.
- */
- class ConnectionsAction implements Action1<ExportConnection> {
-
- public void call(ExportConnection conn) {
- //logger.debug( "Processing connections for entity: " + conn.getSourceUuid() );
- connectionWriteQueue.add(conn);
- connectionsQueued.getAndIncrement();
- }
- }
-
-
- // ----------------------------------------------------------------------------------------
- // writing data
-
- /**
- * Emits entities to be written.
- */
- class EntityWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
- BlockingQueue<ExportEntity> queue;
-
- public EntityWritesOnSubscribe( BlockingQueue<ExportEntity> queue ) {
- this.queue = queue;
- }
-
- public void call(Subscriber<? super ExportEntity> subscriber) {
- int count = 0;
-
- while ( true ) {
- ExportEntity entity = null;
- try {
- //logger.debug( "Wrote {}. Polling for entity to write...", count );
- activePollers.getAndIncrement();
- entity = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
- } catch (InterruptedException e) {
- logger.error("Entity poll interrupted", e);
- continue;
- } finally {
- activePollers.getAndDecrement();
- }
- if ( entity == null ) {
- break;
- }
- subscriber.onNext( entity );
- count++;
- }
-
- logger.info("Done. De-queued {} entities", count);
- if ( count > 0 ) {
- subscriber.onCompleted();
- } else {
- subscriber.unsubscribe();
- }
- }
- }
-
- /**
- * Emits connections to be written.
- */
- class ConnectionWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
- BlockingQueue<ExportConnection> queue;
-
- public ConnectionWritesOnSubscribe( BlockingQueue<ExportConnection> queue ) {
- this.queue = queue;
- }
-
- public void call(Subscriber<? super ExportConnection> subscriber) {
- int count = 0;
- while ( true ) {
- ExportConnection connection = null;
- try {
- //logger.debug( "Wrote {}. Polling for connection to write", count );
- activePollers.getAndIncrement();
- connection = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
- } catch (InterruptedException e) {
- logger.error("Connection poll interrupted", e);
- continue;
- } finally {
- activePollers.getAndDecrement();
- }
- if ( connection == null ) {
- break;
- }
- subscriber.onNext( connection );
- count++;
- }
-
- logger.info("Done. De-queued {} connections", count);
if ( count > 0 ) {
subscriber.onCompleted();
+ logger.info("Completed entity {} type {} connections count {}",
+ new Object[] { exportEntity.getEntity().getName(), exportEntity.getEntity().getType(), count });
+
} else {
subscriber.unsubscribe();
+ logger.info( "Entity {} type {} has no connections",
+ exportEntity.getEntity().getName(), exportEntity.getEntity().getType() );
}
}
}
@@ -566,10 +351,9 @@ public class ExportApp extends ExportingToolBase {
public void call(ExportEntity entity) {
- boolean wroteData = false;
-
String [] parts = Thread.currentThread().getName().split("-");
- String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".entities";
+ String fileName = outputDir.getAbsolutePath() + File.separator
+ + applicationName.replace('/','-') + "-" + parts[3] + ".entities";
JsonGenerator gen = entityGeneratorsByThread.get( Thread.currentThread() );
if ( gen == null ) {
@@ -577,6 +361,7 @@ public class ExportApp extends ExportingToolBase {
// no generator so we are opening new file and writing the start of an array
try {
gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+ logger.info("Opened output file {}", fileName);
} catch (IOException e) {
throw new RuntimeException("Error opening output file: " + fileName, e);
}
@@ -589,15 +374,10 @@ public class ExportApp extends ExportingToolBase {
gen.writeObject( entity );
gen.writeRaw('\n');
entitiesWritten.getAndIncrement();
- wroteData = true;
} catch (IOException e) {
throw new RuntimeException("Error writing to output file: " + fileName, e);
}
-
- if ( !wroteData ) {
- emptyFiles.add( fileName );
- }
}
}
@@ -608,10 +388,9 @@ public class ExportApp extends ExportingToolBase {
public void call(ExportConnection conn) {
- boolean wroteData = false;
-
String [] parts = Thread.currentThread().getName().split("-");
- String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".connections";
+ String fileName = outputDir.getAbsolutePath() + File.separator
+ + applicationName.replace('/','-') + "-" + parts[3] + ".connections";
JsonGenerator gen = connectionGeneratorsByThread.get( Thread.currentThread() );
if ( gen == null ) {
@@ -619,6 +398,7 @@ public class ExportApp extends ExportingToolBase {
// no generator so we are opening new file and writing the start of an array
try {
gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+ logger.info("Opened output file {}", fileName);
} catch (IOException e) {
throw new RuntimeException("Error opening output file: " + fileName, e);
}
@@ -631,18 +411,41 @@ public class ExportApp extends ExportingToolBase {
gen.writeObject( conn );
gen.writeRaw('\n');
connectionsWritten.getAndIncrement();
- wroteData = true;
} catch (IOException e) {
throw new RuntimeException("Error writing to output file: " + fileName, e);
}
+ }
+ }
+
+ private class FileWrapUpAction implements Action0 {
+ @Override
+ public void call() {
+
+ logger.info("-------------------------------------------------------------------");
+ logger.info("DONE! Entities: {} Connections: {}", entitiesWritten.get(), connectionsWritten.get());
+ logger.info("-------------------------------------------------------------------");
- if ( !wroteData ) {
- emptyFiles.add( fileName );
+ for ( JsonGenerator gen : entityGeneratorsByThread.values() ) {
+ try {
+ //gen.writeEndArray();
+ gen.flush();
+ gen.close();
+ } catch (IOException e) {
+ logger.error("Error closing output file", e);
+ }
+ }
+ for ( JsonGenerator gen : connectionGeneratorsByThread.values() ) {
+ try {
+ //gen.writeEndArray();
+ gen.flush();
+ gen.close();
+ } catch (IOException e) {
+ logger.error("Error closing output file", e);
+ }
}
}
}
-
}
class ExportEntity {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a870d69/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
index 3de220c..a7c3905 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
@@ -138,7 +138,7 @@ public abstract class ExportingToolBase extends ToolBase {
if ( !file.mkdirs() ) {
- throw new RuntimeException( String.format( "Unable to create diretory %s", dirName ) );
+ throw new RuntimeException( String.format( "Unable to create directory %s", dirName ) );
}
return file;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a870d69/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index 14b9311..af8306f 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -27,14 +27,26 @@ import org.apache.usergrid.persistence.EntityManager;
import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
public class ExportAppTest {
static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
+
+ int NUM_COLLECTIONS = 5;
+ int NUM_ENTITIES = 10;
+ int NUM_CONNECTIONS = 1;
@ClassRule
public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
@@ -52,46 +64,110 @@ public class ExportAppTest {
ApplicationInfo appInfo = setup.getMgmtSvc().createApplication(
orgInfo.getOrganization().getUuid(), "app_" + rand );
- EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
+ final EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
- // create 10 connected things
+ // create connected things
- List<Entity> connectedThings = new ArrayList<Entity>();
+ final List<Entity> connectedThings = new ArrayList<Entity>();
String connectedType = "connected_thing";
em.createApplicationCollection(connectedType);
- for ( int j=0; j<10; j++) {
+ for ( int j=0; j<NUM_CONNECTIONS; j++) {
final String name = "connected_thing_" + j;
connectedThings.add( em.create( connectedType, new HashMap<String, Object>() {{
put( "name", name );
}} ) );
}
- // create 10 collections of 10 things, every other thing is connected to the connected things
-
- for ( int i=0; i<10; i++) {
- String type = "thing_"+i;
- em.createApplicationCollection(type);
- for ( int j=0; j<10; j++) {
- final String name = "thing_" + j;
- Entity source = em.create(type, new HashMap<String, Object>() {{ put("name", name); }});
- if ( j % 2 == 0 ) {
- for ( Entity target : connectedThings ) {
- em.createConnection( source, "has", target );
+ // create collections of things, every other thing is connected to the connected things
+
+ final AtomicInteger entitiesCount = new AtomicInteger(0);
+ final AtomicInteger connectionCount = new AtomicInteger(0);
+
+ ExecutorService execService = Executors.newFixedThreadPool( 50);
+ final Scheduler scheduler = Schedulers.from( execService );
+
+ Observable.range( 0, NUM_COLLECTIONS ).flatMap( new Func1<Integer, Observable<?>>() {
+ @Override
+ public Observable<?> call(Integer i) {
+
+ return Observable.just( i ).doOnNext( new Action1<Integer>() {
+ @Override
+ public void call(Integer i) {
+
+ final String type = "thing_"+i;
+ try {
+ em.createApplicationCollection( type );
+ connectionCount.getAndIncrement();
+
+ } catch (Exception e) {
+ throw new RuntimeException( "Error creating collection", e );
+ }
+
+ Observable.range( 0, NUM_ENTITIES ).flatMap( new Func1<Integer, Observable<?>>() {
+ @Override
+ public Observable<?> call(Integer j) {
+ return Observable.just( j ).doOnNext( new Action1<Integer>() {
+ @Override
+ public void call(Integer j) {
+
+ final String name = "thing_" + j;
+ try {
+ final Entity source = em.create(
+ type, new HashMap<String, Object>() {{ put("name", name); }});
+ entitiesCount.getAndIncrement();
+ logger.info( "Created entity {} type {}", name, type );
+
+ for ( Entity target : connectedThings ) {
+ em.createConnection( source, "has", target );
+ connectionCount.getAndIncrement();
+ logger.info( "Created connection from entity {} type {} to {}",
+ new Object[]{name, type, target.getName()} );
+ }
+
+
+ } catch (Exception e) {
+ throw new RuntimeException( "Error creating collection", e );
+ }
+
+
+ }
+
+ } );
+
+ }
+ }, 50 ).subscribeOn( scheduler ).subscribe(); // toBlocking().last();
+
}
- }
+ } );
+
+
}
+ }, 30 ).subscribeOn( scheduler ).toBlocking().last();
+
+ while ( entitiesCount.get() < NUM_COLLECTIONS * NUM_ENTITIES ) {
+ Thread.sleep( 5000 );
+ logger.info( "Still working. Created {} entities and {} connections",
+ entitiesCount.get(), connectionCount.get() );
}
-
- // export to file
- String directoryName = "./target/export" + rand;
+ logger.info( "Done. Created {} entities and {} connections", entitiesCount.get(), connectionCount.get() );
+
+ long start = System.currentTimeMillis();
+
+ String directoryName = "target/export" + rand;
ExportApp exportApp = new ExportApp();
exportApp.startTool( new String[]{
"-application", appInfo.getName(),
+ "-readThreads", "50",
+ "-writeThreads", "10",
"-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
"-outputDir", directoryName
}, false );
+ logger.info("time = " + (System.currentTimeMillis() - start)/1000 + "s");
+
+
+
}
}
\ No newline at end of file
[08/10] incubator-usergrid git commit: Merge branch 'master' into
rxportapp
Posted by sf...@apache.org.
Merge branch 'master' into rxportapp
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2748347a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2748347a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2748347a
Branch: refs/heads/master
Commit: 2748347a3e3e19b4db3467e429ab9e27f5742892
Parents: b58390d e1b352e
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Jul 20 16:00:55 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Jul 20 16:00:55 2015 -0400
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ExportAdmins.java | 37 ++++++++------------
.../org/apache/usergrid/tools/ImportAdmins.java | 5 +--
2 files changed, 17 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
[05/10] incubator-usergrid git commit: Less test data and code to
compare 1 read and 1 write thread vs. 100 read and 100 write threads.
Posted by sf...@apache.org.
Less test data and code to compare 1 read and 1 write thread vs. 100 read and 100 write threads.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7b168b91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7b168b91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7b168b91
Branch: refs/heads/master
Commit: 7b168b91d99ba51da452a9c7980b0078f733df03
Parents: a25f8eb
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 16:41:06 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 16:41:06 2015 -0400
----------------------------------------------------------------------
.../apache/usergrid/tools/ExportAppTest.java | 24 ++++++++++++++------
1 file changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b168b91/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index d1c5c1f..eeaae13 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -41,9 +41,9 @@ import java.util.concurrent.atomic.AtomicInteger;
public class ExportAppTest {
static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
- int NUM_COLLECTIONS = 20;
- int NUM_ENTITIES = 200;
- int NUM_CONNECTIONS = 5;
+ int NUM_COLLECTIONS = 10;
+ int NUM_ENTITIES = 50;
+ int NUM_CONNECTIONS = 3;
@ClassRule
public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
@@ -113,12 +113,22 @@ public class ExportAppTest {
ExportApp exportApp = new ExportApp();
exportApp.startTool( new String[]{
"-application", appInfo.getName(),
- "-readThreads", "50",
- "-writeThreads", "10",
+ "-readThreads", "100",
+ "-writeThreads", "100",
"-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
"-outputDir", directoryName
}, false );
-
- logger.info("time = " + (System.currentTimeMillis() - start)/1000 + "s");
+
+ logger.info("100 read and 100 write threads = " + (System.currentTimeMillis() - start)/1000 + "s");
+
+ exportApp.startTool( new String[]{
+ "-application", appInfo.getName(),
+ "-readThreads", "1",
+ "-writeThreads", "1",
+ "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
+ "-outputDir", directoryName + "1"
+ }, false );
+
+ logger.info("1 thread time = " + (System.currentTimeMillis() - start)/1000 + "s");
}
}
\ No newline at end of file
[02/10] incubator-usergrid git commit: Add readThread and writeThread
CLI parameters.
Posted by sf...@apache.org.
Add readThread and writeThread CLI parameters.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2b65e619
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2b65e619
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2b65e619
Branch: refs/heads/master
Commit: 2b65e619316b206720e574a21fe1b33462e0f2dd
Parents: b2bdbb5
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 08:03:18 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 08:03:18 2015 -0400
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ExportApp.java | 153 +++++++++++++------
1 file changed, 106 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2b65e619/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index ceb3ecd..21c63a0 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.Query;
import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.utils.StringUtils;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.util.MinimalPrettyPrinter;
@@ -46,11 +47,22 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
- * Export application's collections.
+ * Export all entities and connections of a Usergrid app.
+ *
+ * Exports data files to specified directory.
+ *
+ * Will create as many output files as there are writeThreads (by default: 10).
+ *
+ * Will create two types of files: *.uge for Usegrird entities and *.ugc for entity to entity connections.
+ *
+ * Every line of the data files is a complete JSON object.
*/
public class ExportApp extends ExportingToolBase {
static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
+ private static final String READ_THREAD_COUNT = "readThreads";
+ private static final String WRITE_THREAD_COUNT = "writeThreads";
+
// we will write two types of files: entities and connections
BlockingQueue<ExportEntity> entityWriteQueue = new LinkedBlockingQueue<ExportEntity>();
BlockingQueue<ExportConnection> connectionWriteQueue = new LinkedBlockingQueue<ExportConnection>();
@@ -58,10 +70,15 @@ public class ExportApp extends ExportingToolBase {
static final String APPLICATION_NAME = "application";
int pollTimeoutSeconds = 10;
+
+ int readThreadCount = 80;
+ int writeThreadCount = 10;
+
+ String applicationName;
+ String organizationName;
// limiting output threads will limit output files
- final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(8);
- final Scheduler scheduler = Schedulers.from( threadPoolExecutor );
+ Scheduler readScheduler;
Map<Thread, JsonGenerator> entityGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
@@ -77,29 +94,37 @@ public class ExportApp extends ExportingToolBase {
ObjectMapper mapper = new ObjectMapper();
/**
- * Export admin users using multiple threads.
- * <p/>
- * How it works:
- * In main thread we query for IDs of all admin users, add each ID to read queue.
- * Read-queue workers read admin user data, add data to write queue.
- * One write-queue worker reads data writes to file.
+ * Tool entry point.
*/
@Override
public void runTool(CommandLine line) throws Exception {
- String applicationName = line.getOptionValue( APPLICATION_NAME );
-
-// if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
-// try {
-// readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
-// } catch (NumberFormatException nfe) {
-// logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
-// return;
-// }
-// } else {
-// readThreadCount = 20;
-// }
+ applicationName = line.getOptionValue( APPLICATION_NAME );
+
+ if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
+ try {
+ readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
+ } catch (NumberFormatException nfe) {
+ logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+ return;
+ }
+ }
+ if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
+ try {
+ writeThreadCount = Integer.parseInt( line.getOptionValue( WRITE_THREAD_COUNT ) );
+ } catch (NumberFormatException nfe) {
+ logger.error( "-" + WRITE_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+ return;
+ }
+ }
+
+ ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
+ readScheduler = Schedulers.from( readThreadPoolExecutor );
+
+ ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
+ final Scheduler writeScheduler = Schedulers.from( writeThreadPoolExecutor );
+
startSpring();
setVerbose( line );
@@ -111,6 +136,7 @@ public class ExportApp extends ExportingToolBase {
UUID applicationId = emf.lookupApplication( applicationName );
final EntityManager em = emf.getEntityManager( applicationId );
+ organizationName = em.getApplication().getOrganizationName();
// start write queue workers
@@ -119,18 +145,18 @@ public class ExportApp extends ExportingToolBase {
entityWritesObservable.flatMap( new Func1<ExportEntity, Observable<?>>() {
public Observable<ExportEntity> call(ExportEntity exportEntity) {
return Observable.just(exportEntity).doOnNext(
- new EntityWriteAction() ).subscribeOn( scheduler );
+ new EntityWriteAction() ).subscribeOn( writeScheduler );
}
- },10).subscribeOn( scheduler ).subscribe();
+ },10).subscribeOn( writeScheduler ).subscribe();
ConnectionWritesOnSubscribe connectionWritesOnSub = new ConnectionWritesOnSubscribe( connectionWriteQueue );
rx.Observable connectionWritesObservable = rx.Observable.create( connectionWritesOnSub );
connectionWritesObservable.flatMap( new Func1<ExportConnection, Observable<?>>() {
public Observable<ExportConnection> call(ExportConnection connection ) {
return Observable.just(connection).doOnNext(
- new ConnectionWriteAction()).subscribeOn( scheduler );
+ new ConnectionWriteAction()).subscribeOn( writeScheduler );
}
- },10).subscribeOn( scheduler ).subscribe();
+ },10).subscribeOn( writeScheduler ).subscribe();
// start processing data and filling up write queues
@@ -139,9 +165,9 @@ public class ExportApp extends ExportingToolBase {
collectionsObservable.flatMap( new Func1<String, Observable<String>>() {
public Observable<String> call(String collection) {
return Observable.just(collection).doOnNext(
- new CollectionAction( em ) ).subscribeOn( Schedulers.io() );
+ new CollectionAction( em ) ).subscribeOn( readScheduler );
}
- },40).subscribeOn( Schedulers.io() ).subscribe();
+ },40).subscribeOn( readScheduler ).subscribe();
// wait for write thread pollers to get started
@@ -192,14 +218,18 @@ public class ExportApp extends ExportingToolBase {
Options options = super.createOptions();
- Option readThreads = OptionBuilder.hasArg().withType("")
+ Option appNameOption = OptionBuilder.hasArg().withType("")
.withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
- options.addOption( readThreads );
-
-// Option readThreads = OptionBuilder
-// .hasArg().withType(0).withDescription("Read Threads -" + READ_THREAD_COUNT).create(READ_THREAD_COUNT);
-// options.addOption( readThreads );
-
+ options.addOption( appNameOption );
+
+ Option readThreadsOption = OptionBuilder
+ .hasArg().withType(0).withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
+ options.addOption( readThreadsOption );
+
+ Option writeThreadsOption = OptionBuilder
+ .hasArg().withType(0).withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
+ options.addOption( writeThreadsOption );
+
return options;
}
@@ -276,9 +306,13 @@ public class ExportApp extends ExportingToolBase {
}
dictionariesByName.put( dictionary, dict );
}
+
ExportEntity exportEntity = new ExportEntity(
- em.getApplication().getApplicationName(),
- entity, dictionariesByName );
+ organizationName,
+ applicationName,
+ entity,
+ dictionariesByName );
+
subscriber.onNext( exportEntity );
count++;
@@ -333,11 +367,14 @@ public class ExportApp extends ExportingToolBase {
for (Entity connectedEntity : results.getEntities()) {
try {
+
ExportConnection connection = new ExportConnection(
- em.getApplication().getApplicationName(),
+ applicationName,
+ organizationName,
connectionType,
exportEntity.getEntity().getUuid(),
connectedEntity.getUuid());
+
subscriber.onNext( connection );
count++;
@@ -379,9 +416,9 @@ public class ExportApp extends ExportingToolBase {
entityObservable.flatMap( new Func1<ExportEntity, Observable<ExportEntity>>() {
public Observable<ExportEntity> call(ExportEntity exportEntity) {
return Observable.just(exportEntity).doOnNext(
- new EntityAction( em ) ).subscribeOn( Schedulers.io() );
+ new EntityAction( em ) ).subscribeOn( readScheduler );
}
- }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+ }, 8).subscribeOn(readScheduler).toBlocking().last();
}
}
@@ -413,9 +450,9 @@ public class ExportApp extends ExportingToolBase {
entityObservable.flatMap( new Func1<ExportConnection, Observable<ExportConnection>>() {
public Observable<ExportConnection> call(ExportConnection connection) {
return Observable.just(connection).doOnNext(
- new ConnectionsAction() ).subscribeOn( Schedulers.io() );
+ new ConnectionsAction() ).subscribeOn(readScheduler);
}
- }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+ }, 8).subscribeOn(readScheduler).toBlocking().last();
}
} catch (Exception e) {
@@ -472,7 +509,7 @@ public class ExportApp extends ExportingToolBase {
count++;
}
- logger.info("Done. Wrote {} entities", count);
+ logger.info("Done. De-queued {} entities", count);
if ( count > 0 ) {
subscriber.onCompleted();
} else {
@@ -513,7 +550,7 @@ public class ExportApp extends ExportingToolBase {
count++;
}
- logger.info("Done. Wrote {} connections", count);
+ logger.info("Done. De-queued {} connections", count);
if ( count > 0 ) {
subscriber.onCompleted();
} else {
@@ -531,7 +568,8 @@ public class ExportApp extends ExportingToolBase {
boolean wroteData = false;
- String fileName = "target/" + Thread.currentThread().getName() + ".ude";
+ String [] parts = Thread.currentThread().getName().split("-");
+ String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".entities";
JsonGenerator gen = entityGeneratorsByThread.get( Thread.currentThread() );
if ( gen == null ) {
@@ -572,7 +610,8 @@ public class ExportApp extends ExportingToolBase {
boolean wroteData = false;
- String fileName = "target/" + Thread.currentThread().getName() + ".ugc";
+ String [] parts = Thread.currentThread().getName().split("-");
+ String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".connections";
JsonGenerator gen = connectionGeneratorsByThread.get( Thread.currentThread() );
if ( gen == null ) {
@@ -607,10 +646,12 @@ public class ExportApp extends ExportingToolBase {
}
class ExportEntity {
+ private String organization;
private String application;
private Entity entity;
private Map<String, Object> dictionaries;
- public ExportEntity( String application, Entity entity, Map<String, Object> dictionaries ) {
+ public ExportEntity( String organization, String application, Entity entity, Map<String, Object> dictionaries ) {
+ this.organization = organization;
this.application = application;
this.entity = entity;
this.dictionaries = dictionaries;
@@ -639,14 +680,24 @@ class ExportEntity {
public void setDictionaries(Map<String, Object> dictionaries) {
this.dictionaries = dictionaries;
}
+
+ public String getOrganization() {
+ return organization;
+ }
+
+ public void setOrganization(String organization) {
+ this.organization = organization;
+ }
}
class ExportConnection {
+ private String organization;
private String application;
private String connectionType;
private UUID sourceUuid;
private UUID targetUuid;
- public ExportConnection(String application, String connectionType, UUID sourceUuid, UUID targetUuid) {
+ public ExportConnection(String organization, String application, String connectionType, UUID sourceUuid, UUID targetUuid) {
+ this.organization= organization;
this.application = application;
this.connectionType = connectionType;
this.sourceUuid = sourceUuid;
@@ -684,4 +735,12 @@ class ExportConnection {
public void setTargetUuid(UUID targetUuid) {
this.targetUuid = targetUuid;
}
+
+ public String getOrganization() {
+ return organization;
+ }
+
+ public void setOrganization(String organization) {
+ this.organization = organization;
+ }
}
[10/10] incubator-usergrid git commit: merge
Posted by sf...@apache.org.
merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9b9f55ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9b9f55ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9b9f55ac
Branch: refs/heads/master
Commit: 9b9f55ac20f669451dadc7cdeeb4643ac0614ad4
Parents: a6e68a0 897fd50
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Jul 21 14:09:05 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Jul 21 14:09:05 2015 -0600
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ExportAdmins.java | 118 ++++++----
.../org/apache/usergrid/tools/ImportAdmins.java | 226 ++++++++++++++-----
.../usergrid/tools/ExportImportAdminsTest.java | 71 ++++--
...adata.usergrid-management.1433331614293.json | 52 +++++
...users.usergrid-management.1433331614293.json | 12 +
5 files changed, 358 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
[06/10] incubator-usergrid git commit: Minor test fixes.
Posted by sf...@apache.org.
Minor test fixes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/dab84e95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/dab84e95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/dab84e95
Branch: refs/heads/master
Commit: dab84e95b54ff9ac573cfa291708938b5e181b61
Parents: 7b168b9
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Jul 15 12:34:26 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Jul 15 12:34:26 2015 -0400
----------------------------------------------------------------------
.../apache/usergrid/tools/ExportAppTest.java | 33 ++++++++++++++++++--
1 file changed, 31 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dab84e95/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index eeaae13..c5411fd 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -24,12 +24,15 @@ import org.apache.usergrid.management.ApplicationInfo;
import org.apache.usergrid.management.OrganizationOwnerInfo;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
+import org.junit.Assert;
import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Scheduler;
import rx.schedulers.Schedulers;
+import java.io.File;
+import java.io.FileFilter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -37,7 +40,13 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * TODO: better test, this is really just a smoke test.
+ */
public class ExportAppTest {
static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
@@ -119,8 +128,15 @@ public class ExportAppTest {
"-outputDir", directoryName
}, false );
- logger.info("100 read and 100 write threads = " + (System.currentTimeMillis() - start)/1000 + "s");
+ logger.info( "100 read and 100 write threads = " + (System.currentTimeMillis() - start) / 1000 + "s" );
+
+ File exportDir = new File(directoryName);
+ assertTrue( getFileCount( exportDir, "entities" ) > 0 );
+ assertTrue( getFileCount( exportDir, "collections" ) > 0 );
+ assertTrue( getFileCount( exportDir, "entities" ) >= 100 );
+ assertTrue( getFileCount( exportDir, "collections" ) >= 100 );
+ File exportDir1 = new File(directoryName + "1");
exportApp.startTool( new String[]{
"-application", appInfo.getName(),
"-readThreads", "1",
@@ -129,6 +145,19 @@ public class ExportAppTest {
"-outputDir", directoryName + "1"
}, false );
- logger.info("1 thread time = " + (System.currentTimeMillis() - start)/1000 + "s");
+ logger.info( "1 thread time = " + (System.currentTimeMillis() - start) / 1000 + "s" );
+
+ exportDir = new File(directoryName);
+ assertEquals( 1, getFileCount( exportDir, "entities" ));
+ assertEquals( 1, getFileCount( exportDir, "collections" ));
+ }
+
+ private static int getFileCount(File exportDir, final String ext ) {
+ return exportDir.listFiles( new FileFilter() {
+ @Override
+ public boolean accept(File pathname) {
+ return pathname.getAbsolutePath().endsWith("." + ext);
+ }
+ } ).length;
}
}
\ No newline at end of file