You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/22 17:18:40 UTC
[01/18] incubator-usergrid git commit: Add new RxJava based
multi-threaded ExportApp tool, and upgrade to RxJava 1.0.12.
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-dev b1393b4ed -> 89dd0ad98
Add new RxJava based multi-threaded ExportApp tool, and upgrade to RxJava 1.0.12.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b2bdbb54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b2bdbb54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b2bdbb54
Branch: refs/heads/two-dot-o-dev
Commit: b2bdbb5456301dbcf7131e780d968514cdd7e55d
Parents: 75ad454
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Jul 13 10:21:09 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Jul 13 10:21:09 2015 -0400
----------------------------------------------------------------------
stack/core/pom.xml | 9 +-
stack/pom.xml | 2 +-
.../org/apache/usergrid/tools/ExportApp.java | 687 +++++++++++++++++++
.../apache/usergrid/tools/ExportAppTest.java | 97 +++
4 files changed, 790 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2bdbb54/stack/core/pom.xml
----------------------------------------------------------------------
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index 47fa840..f60dbc2 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -573,14 +573,15 @@
</dependency>
<dependency>
- <groupId>com.netflix.rxjava</groupId>
- <artifactId>rxjava-core</artifactId>
+ <groupId>io.reactivex</groupId>
+ <artifactId>rxjava</artifactId>
<version>${rx.version}</version>
</dependency>
+
<dependency>
- <groupId>com.netflix.rxjava</groupId>
+ <groupId>io.reactivex</groupId>
<artifactId>rxjava-math</artifactId>
- <version>${rx.version}</version>
+ <version>1.0.0</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2bdbb54/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index bdc3549..da1b62c 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -108,7 +108,7 @@
<antlr.version>3.4</antlr.version>
<tika.version>1.4</tika.version>
<metrics.version>3.0.0</metrics.version>
- <rx.version>0.19.6</rx.version>
+ <rx.version>1.0.12</rx.version>
</properties>
<licenses>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2bdbb54/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
new file mode 100644
index 0000000..ceb3ecd
--- /dev/null
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -0,0 +1,687 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.Results;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.util.MinimalPrettyPrinter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Scheduler;
+import rx.Subscriber;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * Export application's collections.
+ */
+public class ExportApp extends ExportingToolBase {
+ static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
+
+ // we will write two types of files: entities and connections
+ BlockingQueue<ExportEntity> entityWriteQueue = new LinkedBlockingQueue<ExportEntity>();
+ BlockingQueue<ExportConnection> connectionWriteQueue = new LinkedBlockingQueue<ExportConnection>();
+
+ static final String APPLICATION_NAME = "application";
+
+ int pollTimeoutSeconds = 10;
+
+ // limiting output threads will limit output files
+ final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(8);
+ final Scheduler scheduler = Schedulers.from( threadPoolExecutor );
+
+ Map<Thread, JsonGenerator> entityGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
+ Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
+
+ List<String> emptyFiles = new ArrayList<String>();
+
+ AtomicInteger activePollers = new AtomicInteger(0);
+ AtomicInteger entitiesQueued = new AtomicInteger(0);
+ AtomicInteger entitiesWritten = new AtomicInteger(0);
+ AtomicInteger connectionsWritten = new AtomicInteger(0);
+ AtomicInteger connectionsQueued = new AtomicInteger(0);
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ /**
+ * Export admin users using multiple threads.
+ * <p/>
+ * How it works:
+ * In main thread we query for IDs of all admin users, add each ID to read queue.
+ * Read-queue workers read admin user data, add data to write queue.
+ * One write-queue worker reads data writes to file.
+ */
+ @Override
+ public void runTool(CommandLine line) throws Exception {
+
+ String applicationName = line.getOptionValue( APPLICATION_NAME );
+
+// if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
+// try {
+// readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
+// } catch (NumberFormatException nfe) {
+// logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+// return;
+// }
+// } else {
+// readThreadCount = 20;
+// }
+
+ startSpring();
+
+ setVerbose( line );
+
+ applyOrgId( line );
+ prepareBaseOutputFileName( line );
+ outputDir = createOutputParentDir();
+ logger.info( "Export directory: " + outputDir.getAbsolutePath() );
+
+ UUID applicationId = emf.lookupApplication( applicationName );
+ final EntityManager em = emf.getEntityManager( applicationId );
+
+ // start write queue workers
+
+ EntityWritesOnSubscribe entityWritesOnSub = new EntityWritesOnSubscribe( entityWriteQueue );
+ rx.Observable entityWritesObservable = rx.Observable.create( entityWritesOnSub );
+ entityWritesObservable.flatMap( new Func1<ExportEntity, Observable<?>>() {
+ public Observable<ExportEntity> call(ExportEntity exportEntity) {
+ return Observable.just(exportEntity).doOnNext(
+ new EntityWriteAction() ).subscribeOn( scheduler );
+ }
+ },10).subscribeOn( scheduler ).subscribe();
+
+ ConnectionWritesOnSubscribe connectionWritesOnSub = new ConnectionWritesOnSubscribe( connectionWriteQueue );
+ rx.Observable connectionWritesObservable = rx.Observable.create( connectionWritesOnSub );
+ connectionWritesObservable.flatMap( new Func1<ExportConnection, Observable<?>>() {
+ public Observable<ExportConnection> call(ExportConnection connection ) {
+ return Observable.just(connection).doOnNext(
+ new ConnectionWriteAction()).subscribeOn( scheduler );
+ }
+ },10).subscribeOn( scheduler ).subscribe();
+
+ // start processing data and filling up write queues
+
+ CollectionsOnSubscribe onSubscribe = new CollectionsOnSubscribe( em );
+ rx.Observable collectionsObservable = rx.Observable.create( onSubscribe );
+ collectionsObservable.flatMap( new Func1<String, Observable<String>>() {
+ public Observable<String> call(String collection) {
+ return Observable.just(collection).doOnNext(
+ new CollectionAction( em ) ).subscribeOn( Schedulers.io() );
+ }
+ },40).subscribeOn( Schedulers.io() ).subscribe();
+
+ // wait for write thread pollers to get started
+
+ try { Thread.sleep( 1000 ); } catch (InterruptedException ignored) {}
+
+ // wait for write-thread pollers to stop
+
+ while ( activePollers.get() > 0 ) {
+ logger.info(
+ "Active write threads: {}\n"
+ +"Entities written: {}\n"
+ +"Entities queued: {}\n"
+ +"Connections written: {}\n"
+ +"Connections queued: {}\n",
+ new Object[] {
+ activePollers.get(),
+ entitiesWritten.get(),
+ entitiesQueued.get(),
+ connectionsWritten.get(),
+ connectionsQueued.get()} );
+ try { Thread.sleep( 5000 ); } catch (InterruptedException ignored) {}
+ }
+
+ // wrap up files
+
+ for ( JsonGenerator gen : entityGeneratorsByThread.values() ) {
+ //gen.writeEndArray();
+ gen.flush();
+ gen.close();
+ }
+
+ for ( JsonGenerator gen : connectionGeneratorsByThread.values() ) {
+ //gen.writeEndArray();
+ gen.flush();
+ gen.close();
+ }
+
+ for ( String fileName : emptyFiles ) {
+ File emptyFile = new File(fileName);
+ emptyFile.deleteOnExit();
+ }
+
+ }
+
+ @Override
+ @SuppressWarnings("static-access")
+ public Options createOptions() {
+
+ Options options = super.createOptions();
+
+ Option readThreads = OptionBuilder.hasArg().withType("")
+ .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
+ options.addOption( readThreads );
+
+// Option readThreads = OptionBuilder
+// .hasArg().withType(0).withDescription("Read Threads -" + READ_THREAD_COUNT).create(READ_THREAD_COUNT);
+// options.addOption( readThreads );
+
+ return options;
+ }
+
+ // ----------------------------------------------------------------------------------------
+ // reading data
+
+ /**
+ * Emits collection names found in application.
+ */
+ class CollectionsOnSubscribe implements rx.Observable.OnSubscribe<String> {
+ EntityManager em;
+
+ public CollectionsOnSubscribe( EntityManager em ) {
+ this.em = em;
+ }
+
+ public void call(Subscriber<? super String> subscriber) {
+
+ logger.info("Starting to read collections");
+
+ int count = 0;
+ try {
+ Map<String, Object> collectionMetadata = em.getApplicationCollectionMetadata();
+ for ( String collection : collectionMetadata.keySet() ) {
+ subscriber.onNext( collection );
+ count++;
+ }
+
+ } catch (Exception e) {
+ subscriber.onError( e );
+ }
+ logger.info("Done. Read {} collection names", count);
+ if ( count > 0 ) {
+ subscriber.onCompleted();
+ } else {
+ subscriber.unsubscribe();
+ }
+ }
+ }
+
+ /**
+ * Emits entities of collection.
+ */
+ class EntityOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
+ EntityManager em;
+ String collection;
+
+ public EntityOnSubscribe(EntityManager em, String collection) {
+ this.em = em;
+ this.collection = collection;
+ }
+
+ public void call(Subscriber<? super ExportEntity> subscriber) {
+
+ logger.info("Starting to read entities of collection {}", collection);
+
+ try {
+ int count = 0;
+
+ Query query = new Query();
+ query.setLimit( MAX_ENTITY_FETCH );
+
+ Results results = em.searchCollection( em.getApplicationRef(), collection, query );
+
+ while (results.size() > 0) {
+ for (Entity entity : results.getEntities()) {
+ try {
+ Set<String> dictionaries = em.getDictionaries( entity );
+ Map dictionariesByName = new HashMap<String, Map<Object, Object>>();
+ for (String dictionary : dictionaries) {
+ Map<Object, Object> dict = em.getDictionaryAsMap( entity, dictionary );
+ if (dict.isEmpty()) {
+ continue;
+ }
+ dictionariesByName.put( dictionary, dict );
+ }
+ ExportEntity exportEntity = new ExportEntity(
+ em.getApplication().getApplicationName(),
+ entity, dictionariesByName );
+ subscriber.onNext( exportEntity );
+ count++;
+
+ } catch (Exception e) {
+ logger.error("Error reading entity " + entity.getUuid() +" from collection " + collection);
+ }
+ }
+ if (results.getCursor() == null) {
+ break;
+ }
+ query.setCursor( results.getCursor() );
+ results = em.searchCollection( em.getApplicationRef(), collection, query );
+ }
+
+ logger.info("Done. Read {} entities", count);
+ if ( count > 0 ) {
+ subscriber.onCompleted();
+ } else {
+ subscriber.unsubscribe();
+ }
+
+ } catch ( Exception e ) {
+ subscriber.onError(e);
+ }
+ }
+ }
+
+ /**
+ * Emits connections of an entity.
+ */
+ class ConnectionsOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
+ EntityManager em;
+ ExportEntity exportEntity;
+
+ public ConnectionsOnSubscribe(EntityManager em, ExportEntity exportEntity) {
+ this.em = em;
+ this.exportEntity = exportEntity;
+ }
+
+ public void call(Subscriber<? super ExportConnection> subscriber) {
+
+ logger.info("Starting to read connections for entity type {}", exportEntity.getEntity().getType());
+
+ int count = 0;
+
+ try {
+ Set<String> connectionTypes = em.getConnectionTypes( exportEntity.getEntity() );
+ for (String connectionType : connectionTypes) {
+
+ Results results = em.getConnectedEntities(
+ exportEntity.getEntity().getUuid(), connectionType, null, Results.Level.CORE_PROPERTIES );
+
+ for (Entity connectedEntity : results.getEntities()) {
+ try {
+ ExportConnection connection = new ExportConnection(
+ em.getApplication().getApplicationName(),
+ connectionType,
+ exportEntity.getEntity().getUuid(),
+ connectedEntity.getUuid());
+ subscriber.onNext( connection );
+ count++;
+
+ } catch (Exception e) {
+ logger.error( "Error reading connection entity "
+ + exportEntity.getEntity().getUuid() + " -> " + connectedEntity.getType());
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ subscriber.onError( e );
+ }
+
+ logger.info("Done. Read {} connections", count);
+ if ( count > 0 ) {
+ subscriber.onCompleted();
+ } else {
+ subscriber.unsubscribe();
+ }
+ }
+ }
+
+ /**
+ * Process collection by starting processing of its entities.
+ */
+ class CollectionAction implements Action1<String> {
+ EntityManager em;
+
+ public CollectionAction( EntityManager em ) {
+ this.em = em;
+ }
+
+ public void call(String collection) {
+
+ // process entities of collection in parallel
+ EntityOnSubscribe onSubscribe = new EntityOnSubscribe( em, collection );
+ rx.Observable entityObservable = rx.Observable.create( onSubscribe );
+ entityObservable.flatMap( new Func1<ExportEntity, Observable<ExportEntity>>() {
+ public Observable<ExportEntity> call(ExportEntity exportEntity) {
+ return Observable.just(exportEntity).doOnNext(
+ new EntityAction( em ) ).subscribeOn( Schedulers.io() );
+ }
+ }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+ }
+ }
+
+ /**
+ * Process entity by adding it to entityWriteQueue and starting processing of its connections.
+ */
+ class EntityAction implements Action1<ExportEntity> {
+ EntityManager em;
+
+ public EntityAction( EntityManager em ) {
+ this.em = em;
+ }
+
+ public void call(ExportEntity exportEntity) {
+ //logger.debug( "Processing entity: " + exportEntity.getEntity().getUuid() );
+
+ entityWriteQueue.add( exportEntity );
+ entitiesQueued.getAndIncrement();
+
+ // if entity has connections, process them in parallel
+ try {
+ Results connectedEntities = em.getConnectedEntities(
+ exportEntity.getEntity().getUuid(), null, null, Results.Level.CORE_PROPERTIES );
+
+ if ( !connectedEntities.isEmpty() ) {
+ ConnectionsOnSubscribe onSubscribe = new ConnectionsOnSubscribe( em, exportEntity );
+ rx.Observable entityObservable = rx.Observable.create( onSubscribe );
+
+ entityObservable.flatMap( new Func1<ExportConnection, Observable<ExportConnection>>() {
+ public Observable<ExportConnection> call(ExportConnection connection) {
+ return Observable.just(connection).doOnNext(
+ new ConnectionsAction() ).subscribeOn( Schedulers.io() );
+ }
+ }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+ }
+
+ } catch (Exception e) {
+ throw new RuntimeException( "Error getting connections", e );
+ }
+ }
+ }
+
+ /**
+ * Process connection by adding it to connectionWriteQueue.
+ */
+ class ConnectionsAction implements Action1<ExportConnection> {
+
+ public void call(ExportConnection conn) {
+ //logger.debug( "Processing connections for entity: " + conn.getSourceUuid() );
+ connectionWriteQueue.add(conn);
+ connectionsQueued.getAndIncrement();
+ }
+ }
+
+
+ // ----------------------------------------------------------------------------------------
+ // writing data
+
+ /**
+ * Emits entities to be written.
+ */
+ class EntityWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
+ BlockingQueue<ExportEntity> queue;
+
+ public EntityWritesOnSubscribe( BlockingQueue<ExportEntity> queue ) {
+ this.queue = queue;
+ }
+
+ public void call(Subscriber<? super ExportEntity> subscriber) {
+ int count = 0;
+
+ while ( true ) {
+ ExportEntity entity = null;
+ try {
+ //logger.debug( "Wrote {}. Polling for entity to write...", count );
+ activePollers.getAndIncrement();
+ entity = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
+ } catch (InterruptedException e) {
+ logger.error("Entity poll interrupted", e);
+ continue;
+ } finally {
+ activePollers.getAndDecrement();
+ }
+ if ( entity == null ) {
+ break;
+ }
+ subscriber.onNext( entity );
+ count++;
+ }
+
+ logger.info("Done. Wrote {} entities", count);
+ if ( count > 0 ) {
+ subscriber.onCompleted();
+ } else {
+ subscriber.unsubscribe();
+ }
+ }
+ }
+
+ /**
+ * Emits connections to be written.
+ */
+ class ConnectionWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
+ BlockingQueue<ExportConnection> queue;
+
+ public ConnectionWritesOnSubscribe( BlockingQueue<ExportConnection> queue ) {
+ this.queue = queue;
+ }
+
+ public void call(Subscriber<? super ExportConnection> subscriber) {
+ int count = 0;
+
+ while ( true ) {
+ ExportConnection connection = null;
+ try {
+ //logger.debug( "Wrote {}. Polling for connection to write", count );
+ activePollers.getAndIncrement();
+ connection = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
+ } catch (InterruptedException e) {
+ logger.error("Connection poll interrupted", e);
+ continue;
+ } finally {
+ activePollers.getAndDecrement();
+ }
+ if ( connection == null ) {
+ break;
+ }
+ subscriber.onNext( connection );
+ count++;
+ }
+
+ logger.info("Done. Wrote {} connections", count);
+ if ( count > 0 ) {
+ subscriber.onCompleted();
+ } else {
+ subscriber.unsubscribe();
+ }
+ }
+ }
+
+ /**
+ * Writes entities to JSON file.
+ */
+ class EntityWriteAction implements Action1<ExportEntity> {
+
+ public void call(ExportEntity entity) {
+
+ boolean wroteData = false;
+
+ String fileName = "target/" + Thread.currentThread().getName() + ".ude";
+
+ JsonGenerator gen = entityGeneratorsByThread.get( Thread.currentThread() );
+ if ( gen == null ) {
+
+ // no generator so we are opening new file and writing the start of an array
+ try {
+ gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+ } catch (IOException e) {
+ throw new RuntimeException("Error opening output file: " + fileName, e);
+ }
+ gen.setPrettyPrinter( new MinimalPrettyPrinter(""));
+ gen.setCodec( mapper );
+ entityGeneratorsByThread.put( Thread.currentThread(), gen );
+ }
+
+ try {
+ gen.writeObject( entity );
+ gen.writeRaw('\n');
+ entitiesWritten.getAndIncrement();
+ wroteData = true;
+
+ } catch (IOException e) {
+ throw new RuntimeException("Error writing to output file: " + fileName, e);
+ }
+
+ if ( !wroteData ) {
+ emptyFiles.add( fileName );
+ }
+ }
+ }
+
+ /**
+ * Writes connection to JSON file.
+ */
+ class ConnectionWriteAction implements Action1<ExportConnection> {
+
+ public void call(ExportConnection conn) {
+
+ boolean wroteData = false;
+
+ String fileName = "target/" + Thread.currentThread().getName() + ".ugc";
+
+ JsonGenerator gen = connectionGeneratorsByThread.get( Thread.currentThread() );
+ if ( gen == null ) {
+
+ // no generator so we are opening new file and writing the start of an array
+ try {
+ gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+ } catch (IOException e) {
+ throw new RuntimeException("Error opening output file: " + fileName, e);
+ }
+ gen.setPrettyPrinter( new MinimalPrettyPrinter(""));
+ gen.setCodec( mapper );
+ connectionGeneratorsByThread.put( Thread.currentThread(), gen );
+ }
+
+ try {
+ gen.writeObject( conn );
+ gen.writeRaw('\n');
+ connectionsWritten.getAndIncrement();
+ wroteData = true;
+
+ } catch (IOException e) {
+ throw new RuntimeException("Error writing to output file: " + fileName, e);
+ }
+
+ if ( !wroteData ) {
+ emptyFiles.add( fileName );
+ }
+ }
+ }
+
+}
+
+class ExportEntity {
+ private String application;
+ private Entity entity;
+ private Map<String, Object> dictionaries;
+ public ExportEntity( String application, Entity entity, Map<String, Object> dictionaries ) {
+ this.application = application;
+ this.entity = entity;
+ this.dictionaries = dictionaries;
+ }
+
+ public String getApplication() {
+ return application;
+ }
+
+ public void setApplication(String application) {
+ this.application = application;
+ }
+
+ public Entity getEntity() {
+ return entity;
+ }
+
+ public void setEntity(Entity entity) {
+ this.entity = entity;
+ }
+
+ public Map<String, Object> getDictionaries() {
+ return dictionaries;
+ }
+
+ public void setDictionaries(Map<String, Object> dictionaries) {
+ this.dictionaries = dictionaries;
+ }
+}
+
+class ExportConnection {
+ private String application;
+ private String connectionType;
+ private UUID sourceUuid;
+ private UUID targetUuid;
+ public ExportConnection(String application, String connectionType, UUID sourceUuid, UUID targetUuid) {
+ this.application = application;
+ this.connectionType = connectionType;
+ this.sourceUuid = sourceUuid;
+ this.targetUuid = targetUuid;
+ }
+
+ public String getApplication() {
+ return application;
+ }
+
+ public void setApplication(String application) {
+ this.application = application;
+ }
+
+ public String getConnectionType() {
+ return connectionType;
+ }
+
+ public void setConnectionType(String connectionType) {
+ this.connectionType = connectionType;
+ }
+
+ public UUID getSourceUuid() {
+ return sourceUuid;
+ }
+
+ public void setSourceUuid(UUID sourceUuid) {
+ this.sourceUuid = sourceUuid;
+ }
+
+ public UUID getTargetUuid() {
+ return targetUuid;
+ }
+
+ public void setTargetUuid(UUID targetUuid) {
+ this.targetUuid = targetUuid;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2bdbb54/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
new file mode 100644
index 0000000..14b9311
--- /dev/null
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.ServiceITSetup;
+import org.apache.usergrid.ServiceITSetupImpl;
+import org.apache.usergrid.ServiceITSuite;
+import org.apache.usergrid.management.ApplicationInfo;
+import org.apache.usergrid.management.OrganizationOwnerInfo;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.junit.ClassRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+
+public class ExportAppTest {
+ static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
+
+ @ClassRule
+ public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
+
+ @org.junit.Test
+ public void testBasicOperation() throws Exception {
+
+ String rand = RandomStringUtils.randomAlphanumeric( 10 );
+
+ // create app with some data
+
+ OrganizationOwnerInfo orgInfo = setup.getMgmtSvc().createOwnerAndOrganization(
+ "org_" + rand, "user_" + rand, rand.toUpperCase(), rand + "@example.com", rand );
+
+ ApplicationInfo appInfo = setup.getMgmtSvc().createApplication(
+ orgInfo.getOrganization().getUuid(), "app_" + rand );
+
+ EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
+
+ // create 10 connected things
+
+ List<Entity> connectedThings = new ArrayList<Entity>();
+ String connectedType = "connected_thing";
+ em.createApplicationCollection(connectedType);
+ for ( int j=0; j<10; j++) {
+ final String name = "connected_thing_" + j;
+ connectedThings.add( em.create( connectedType, new HashMap<String, Object>() {{
+ put( "name", name );
+ }} ) );
+ }
+
+ // create 10 collections of 10 things, every other thing is connected to the connected things
+
+ for ( int i=0; i<10; i++) {
+ String type = "thing_"+i;
+ em.createApplicationCollection(type);
+ for ( int j=0; j<10; j++) {
+ final String name = "thing_" + j;
+ Entity source = em.create(type, new HashMap<String, Object>() {{ put("name", name); }});
+ if ( j % 2 == 0 ) {
+ for ( Entity target : connectedThings ) {
+ em.createConnection( source, "has", target );
+ }
+ }
+ }
+ }
+
+ // export to file
+
+ String directoryName = "./target/export" + rand;
+
+ ExportApp exportApp = new ExportApp();
+ exportApp.startTool( new String[]{
+ "-application", appInfo.getName(),
+ "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
+ "-outputDir", directoryName
+ }, false );
+
+ }
+}
\ No newline at end of file
[15/18] incubator-usergrid git commit: Merge branch 'master' into
two-dot-o
Posted by sn...@apache.org.
Merge branch 'master' into two-dot-o
Conflicts:
stack/core/pom.xml
stack/pom.xml
stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
stack/tools/src/main/resources/log4j.properties
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/066d7db4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/066d7db4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/066d7db4
Branch: refs/heads/two-dot-o-dev
Commit: 066d7db46786cc042cdc4fa28b56eb2f729561d1
Parents: 764a7c8 9b9f55a
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Jul 22 09:44:58 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Jul 22 09:44:58 2015 -0400
----------------------------------------------------------------------
stack/core/pom.xml | 1 -
stack/pom.xml | 2 +-
stack/tools/pom.xml | 6 +
.../org/apache/usergrid/tools/ExportAdmins.java | 117 ++--
.../org/apache/usergrid/tools/ExportApp.java | 536 +++++++++++++++++++
.../usergrid/tools/ExportDataCreator.java | 244 +++++++--
.../usergrid/tools/ExportingToolBase.java | 2 +-
.../org/apache/usergrid/tools/ImportAdmins.java | 226 ++++++--
.../org/apache/usergrid/tools/ToolBase.java | 2 +-
stack/tools/src/main/resources/log4j.properties | 5 +
.../apache/usergrid/tools/ExportAppTest.java | 118 ++++
.../usergrid/tools/ExportImportAdminsTest.java | 71 ++-
...adata.usergrid-management.1433331614293.json | 52 ++
...users.usergrid-management.1433331614293.json | 12 +
14 files changed, 1225 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/066d7db4/stack/core/pom.xml
----------------------------------------------------------------------
diff --cc stack/core/pom.xml
index d91208d,f60dbc2..5e65ac3
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@@ -473,23 -573,16 +473,22 @@@
</dependency>
<dependency>
- <groupId>io.reactivex</groupId>
- <artifactId>rxjava</artifactId>
+ <groupId>com.netflix.rxjava</groupId>
+ <artifactId>rxjava-core</artifactId>
<version>${rx.version}</version>
</dependency>
-
-
<dependency>
- <groupId>io.reactivex</groupId>
+ <groupId>com.netflix.rxjava</groupId>
<artifactId>rxjava-math</artifactId>
- <version>1.0.0</version>
+ <version>${rx.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>com.clearspring.analytics</groupId>
+ <artifactId>stream</artifactId>
+ <version>2.7.0</version>
+ </dependency>
+
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/066d7db4/stack/pom.xml
----------------------------------------------------------------------
diff --cc stack/pom.xml
index cc61a7c,0e1b32d..cc39e04
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@@ -32,107 -32,84 +32,107 @@@
<description>Parent module for the Apache Usergrid Project</description>
<packaging>pom</packaging>
- <properties>
- <!-- =================================================================== -->
- <!-- Properties: Deployment Setting Defaults -->
- <!-- =================================================================== -->
- <!-- NOTE: override from the CLI or settings.xml -->
- <!-- NOTE: add server credentials config via settings -->
- <!-- NOTE: <settings> -->
- <!-- NOTE: <servers> -->
- <!-- NOTE: <server> -->
- <!-- NOTE: <id>usergrid.releases</id> -->
- <!-- NOTE: <username>akarasulu</username> -->
- <!-- NOTE: <password>*********</password> -->
- <!-- NOTE: </server> -->
- <!-- NOTE: <server> -->
- <!-- NOTE: <id>usergrid.snapshots</id> -->
- <!-- NOTE: <username>akarasulu</username> -->
- <!-- NOTE: <password>*********</password> -->
- <!-- NOTE: </server> -->
- <!-- NOTE: </servers> -->
- <!-- NOTE: -->
- <!-- NOTE: <profiles> -->
- <!-- NOTE: <profile> -->
- <!-- NOTE: <id>deployment</id> -->
- <!-- NOTE: <properties> -->
- <!-- NOTE: <release.repository.url> -->
- <!-- NOTE: https://to/your/custom/releases/repository -->
- <!-- NOTE: </release.repository.url> -->
- <!-- NOTE: <snapshot.repository.url> -->
- <!-- NOTE: https://to/your/custom/snapshots/repository -->
- <!-- NOTE: </shapshot.repository.url> -->
- <!-- NOTE: </properties> -->
- <!-- NOTE: </profile> -->
- <!-- NOTE: </profiles> -->
- <!-- NOTE: -->
- <!-- NOTE: <activeProfiles> -->
- <!-- NOTE: <activeProfile>deployment</activeProfile> -->
- <!-- NOTE: </activeProfiles> -->
- <!-- NOTE: </settings> -->
-
- <!-- =================================================================== -->
- <!-- Properties: General Settings -->
- <!-- =================================================================== -->
-
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-
- <!-- you can override these via MAVEN_OPTS -->
- <ug.heapmax>2048m</ug.heapmax>
- <ug.heapmin>2048m</ug.heapmin>
- <ug.argline>-Djava.awt.headless=true</ug.argline>
-
- <usergrid-custom-spring-properties>classpath:/usergrid-custom.properties</usergrid-custom-spring-properties>
- <usergrid-custom-spring-test-properties>classpath:/usergrid-custom-test.properties</usergrid-custom-spring-test-properties>
-
- <!-- =================================================================== -->
- <!-- Properties: Dependency Settings -->
- <!-- =================================================================== -->
-
- <amber-version>0.22-incubating</amber-version>
- <cassandra-version>1.2.12</cassandra-version>
- <hector-om-version>3.0-03</hector-om-version>
- <hector-version>1.1-4</hector-version>
- <hector-test-version>1.1-4</hector-test-version>
- <jackson-version>1.9.9</jackson-version>
- <jclouds.version>1.7.1</jclouds.version>
- <jersey-version>1.18</jersey-version>
- <junit-version>4.11</junit-version>
- <log4j-version>1.2.16</log4j-version>
- <metrics-version>2.1.2</metrics-version>
- <org.springframework.version>3.1.2.RELEASE</org.springframework.version>
- <shiro-version>1.2.0</shiro-version>
- <slf4j-version>1.6.1</slf4j-version>
- <snakeyaml-version>1.9</snakeyaml-version>
- <tomcat-version>7.0.42</tomcat-version>
- <antlr.version>3.4</antlr.version>
- <tika.version>1.4</tika.version>
- <metrics.version>3.0.0</metrics.version>
- <rx.version>1.0.12</rx.version>
- </properties>
+ <properties>
+ <!-- =================================================================== -->
+ <!-- Properties: Deployment Setting Defaults -->
+ <!-- =================================================================== -->
+ <!-- NOTE: override from the CLI or settings.xml -->
+ <!-- NOTE: add server credentials config via settings -->
+ <!-- NOTE: <settings> -->
+ <!-- NOTE: <servers> -->
+ <!-- NOTE: <server> -->
+ <!-- NOTE: <id>usergrid.releases</id> -->
+ <!-- NOTE: <username>akarasulu</username> -->
+ <!-- NOTE: <password>*********</password> -->
+ <!-- NOTE: </server> -->
+ <!-- NOTE: <server> -->
+ <!-- NOTE: <id>usergrid.snapshots</id> -->
+ <!-- NOTE: <username>akarasulu</username> -->
+ <!-- NOTE: <password>*********</password> -->
+ <!-- NOTE: </server> -->
+ <!-- NOTE: </servers> -->
+ <!-- NOTE: -->
+ <!-- NOTE: <profiles> -->
+ <!-- NOTE: <profile> -->
+ <!-- NOTE: <id>deployment</id> -->
+ <!-- NOTE: <properties> -->
+ <!-- NOTE: <release.repository.url> -->
+ <!-- NOTE: https://to/your/custom/releases/repository -->
+ <!-- NOTE: </release.repository.url> -->
+ <!-- NOTE: <snapshot.repository.url> -->
+ <!-- NOTE: https://to/your/custom/snapshots/repository -->
+ <!-- NOTE: </shapshot.repository.url> -->
+ <!-- NOTE: </properties> -->
+ <!-- NOTE: </profile> -->
+ <!-- NOTE: </profiles> -->
+ <!-- NOTE: -->
+ <!-- NOTE: <activeProfiles> -->
+ <!-- NOTE: <activeProfile>deployment</activeProfile> -->
+ <!-- NOTE: </activeProfiles> -->
+ <!-- NOTE: </settings> -->
+
+ <snapshot.repository.url>
+ https://repository.apache.org/content/repositories/snapshots
+ </snapshot.repository.url>
+ <release.repository.url>
+ https://repository.apache.org/service/local/staging/deploy/maven2
+ </release.repository.url>
+
+ <!-- =================================================================== -->
+ <!-- Properties: General Settings -->
+ <!-- =================================================================== -->
+
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+
+ <!-- you can override these via MAVEN_OPTS -->
+ <ug.heapmax>4096m</ug.heapmax>
+ <ug.heapmin>2048m</ug.heapmin>
+ <ug.argline>-Djava.awt.headless=true</ug.argline>
+
+ <usergrid-custom-spring-properties>classpath:/usergrid-deployment.properties</usergrid-custom-spring-properties>
+ <usergrid-custom-spring-test-properties>classpath:/usergrid-custom-test.properties</usergrid-custom-spring-test-properties>
+
+ <!-- =================================================================== -->
+ <!-- Properties: Dependency Settings -->
+ <!-- =================================================================== -->
+
+ <amber-version>0.22-incubating</amber-version>
+ <cassandra-version>1.2.18</cassandra-version>
+ <guava.version>18.0</guava.version>
+ <guice.version>4.0-beta5</guice.version>
+ <hector-om-version>3.0-03</hector-om-version>
+ <hector-version>1.1-4</hector-version>
+ <hector-test-version>1.1-4</hector-test-version>
+ <jackson-version>1.9.9</jackson-version>
+ <jackson-2-version>2.3.3</jackson-2-version>
+ <jclouds.version>1.8.0</jclouds.version>
+ <jersey-version>1.18.1</jersey-version>
+ <junit-version>4.12</junit-version>
+ <log4j-version>1.2.16</log4j-version>
+ <org.springframework.version>3.1.2.RELEASE</org.springframework.version>
+ <shiro-version>1.2.3</shiro-version>
+ <slf4j-version>1.6.1</slf4j-version>
- <snakeyaml-version>1.8</snakeyaml-version>
++ <snakeyaml-version>1.9</snakeyaml-version>
+ <tomcat-version>7.0.59</tomcat-version>
+ <antlr.version>3.4</antlr.version>
+ <tika.version>1.4</tika.version>
+ <mockito.version>1.10.8</mockito.version>
+
+ <!-- only use half the cores on the machine for testing -->
+ <usergrid.it.parallel>methods</usergrid.it.parallel>
+ <usergrid.it.reuseForks>true</usergrid.it.reuseForks>
+ <usergrid.it.forkCount>1</usergrid.it.forkCount>
+ <usergrid.it.threads>8</usergrid.it.threads>
+
+ <metrics.version>3.0.0</metrics.version>
+ <rx.version>0.19.6</rx.version>
+ <surefire.plugin.artifactName>surefire-junit47</surefire.plugin.artifactName>
+ <surefire.plugin.version>2.18.1</surefire.plugin.version>
+ <powermock.version>1.6.1</powermock.version>
+
+ <maven.build.timestamp.format>yyyy-MM-dd'T'HH-mm-ss'Z'</maven.build.timestamp.format>
+
+ </properties>
<licenses>
<license>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/066d7db4/stack/tools/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/066d7db4/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
----------------------------------------------------------------------
diff --cc stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
index e175d01,d5dd42c..0bb74ab
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
@@@ -22,12 -23,13 +22,10 @@@ import org.apache.commons.cli.CommandLi
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
-import org.apache.usergrid.management.UserInfo;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
- import org.apache.usergrid.management.UserInfo;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.Query;
import org.apache.usergrid.persistence.Results;
- import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.persistence.Results.Level;
-import org.apache.usergrid.persistence.cassandra.CassandraService;
import org.apache.usergrid.utils.StringUtils;
import org.codehaus.jackson.JsonGenerator;
import org.slf4j.Logger;
@@@ -57,17 -61,29 +55,29 @@@ import java.util.concurrent.atomic.Atom
* cassandra.lock.keyspace=My_Usergrid_Locks
*/
public class ExportAdmins extends ExportingToolBase {
-
+
static final Logger logger = LoggerFactory.getLogger( ExportAdmins.class );
-
++
public static final String ADMIN_USERS_PREFIX = "admin-users";
public static final String ADMIN_USER_METADATA_PREFIX = "admin-user-metadata";
-
++
+ // map admin user UUID to list of organizations to which user belongs
+ private Map<UUID, List<Org>> userToOrgsMap = new HashMap<UUID, List<Org>>(50000);
+
+ private Map<String, UUID> orgNameToUUID = new HashMap<String, UUID>(50000);
-
++
+ private Set<UUID> orgsWritten = new HashSet<UUID>(50000);
-
++
+ private Set<UUID> duplicateOrgs = new HashSet<UUID>();
-
++
private static final String READ_THREAD_COUNT = "readThreads";
- private Map<String, List<Org>> orgMap = new HashMap<String, List<Org>>(80000);
private int readThreadCount;
- AtomicInteger count = new AtomicInteger( 0 );
+ AtomicInteger userCount = new AtomicInteger( 0 );
-
++
+ boolean ignoreInvalidUsers = false; // true to ignore users with no credentials or orgs
-
-
+
+
/**
* Represents an AdminUser that has been read and is ready for export.
*/
@@@ -169,10 -185,10 +179,10 @@@
while ( !done ) {
writeThread.join( 10000, 0 );
done = !writeThread.isAlive();
- logger.info( "Wrote {} users", count.get() );
+ logger.info( "Wrote {} users", userCount.get() );
}
}
-
+
@Override
@SuppressWarnings("static-access")
@@@ -207,10 -223,11 +217,11 @@@
organizations = em.searchCollection( em.getApplicationRef(), "groups", query );
for ( Entity organization : organizations.getEntities() ) {
execService.submit( new OrgMapWorker( organization ) );
+ count++;
}
- count++;
-
++
if ( count % 1000 == 0 ) {
- logger.info("Processed {} orgs for org map", count);
+ logger.info("Queued {} org map workers", count);
}
query.setCursor( organizations.getCursor() );
}
@@@ -218,8 -235,10 +229,10 @@@
execService.shutdown();
while ( !execService.awaitTermination( 10, TimeUnit.SECONDS ) ) {
- logger.info("Processed {} orgs for map", orgMap.size() );
+ logger.info( "Processed {} orgs for map", userToOrgsMap.size() );
}
-
++
+ logger.info("Org map complete, counted {} organizations", count);
}
@@@ -235,6 -254,7 +248,7 @@@
try {
final String orgName = orgEntity.getProperty( "path" ).toString();
final UUID orgId = orgEntity.getUuid();
-
++
for (UserInfo user : managementService.getAdminUsersForOrganization( orgEntity.getUuid() )) {
try {
Entity admin = managementService.getAdminUserEntityByUuid( user.getUuid() );
@@@ -297,10 -332,34 +326,34 @@@
AdminUserWriteTask task = new AdminUserWriteTask();
task.adminUser = entity;
- addDictionariesToTask( task, entity );
+ addDictionariesToTask( task, entity );
addOrganizationsToTask( task );
- writeQueue.add( task );
+ String actionTaken = "Processed";
+
+ if (ignoreInvalidUsers && (task.orgNamesByUuid.isEmpty()
+ || task.dictionariesByName.isEmpty()
+ || task.dictionariesByName.get( "credentials" ).isEmpty())) {
-
++
+ actionTaken = "Ignored";
-
++
+ } else {
+ writeQueue.add( task );
+ }
+
- Map<String, Object> creds = (Map<String, Object>) (task.dictionariesByName.isEmpty() ?
++ Map<String, Object> creds = (Map<String, Object>) (task.dictionariesByName.isEmpty() ?
+ 0 : task.dictionariesByName.get( "credentials" ));
-
++
+ logger.error( "{} admin user {}:{}:{} has organizations={} dictionaries={} credentials={}",
+ new Object[]{
+ actionTaken,
+ task.adminUser.getProperty( "username" ),
+ task.adminUser.getProperty( "email" ),
+ task.adminUser.getUuid(),
+ task.orgNamesByUuid.size(),
+ task.dictionariesByName.size(),
+ creds == null ? 0 : creds.size()
- } );
++ } );
} catch ( Exception e ) {
logger.error("Error reading data for user " + uuid, e );
@@@ -344,22 -391,17 +385,32 @@@
task.orgNamesByUuid = managementService.getOrganizationsForAdminUser( task.adminUser.getUuid() );
++<<<<<<< HEAD
+ List<Org> orgs = orgMap.get( task.adminUser.getProperty( "username" ).toString().toLowerCase() );
+
++=======
+ List<Org> orgs = userToOrgsMap.get( task.adminUser.getProperty( "username" ).toString().toLowerCase() );
-
++
++>>>>>>> master
if ( orgs != null && task.orgNamesByUuid.size() < orgs.size() ) {
-
++
+ // list of orgs from getOrganizationsForAdminUser() is less than expected, use userToOrgsMap
BiMap<UUID, String> bimap = HashBiMap.create();
for (Org org : orgs) {
bimap.put( org.orgId, org.orgName );
}
task.orgNamesByUuid = bimap;
}
++<<<<<<< HEAD
+
+ if ( task.orgNamesByUuid.isEmpty() ) {
+ logger.error("{}:{}:{} has no orgs", new Object[] {
+ task.adminUser.getProperty("username"),
+ task.adminUser.getProperty("email"),
+ task.adminUser.getUuid() } );
+ }
++=======
++>>>>>>> master
}
}
@@@ -485,6 -527,10 +536,10 @@@
jg.writeObject( orgs.get( uuid ) );
jg.writeEndObject();
-
++
+ synchronized (orgsWritten) {
+ orgsWritten.add( uuid );
+ }
}
jg.writeEndArray();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/066d7db4/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
----------------------------------------------------------------------
diff --cc stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
index 7f8dd1b,3c427e1..c97dc9c
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
@@@ -50,8 -50,8 +50,8 @@@ import static org.apache.usergrid.utils
/**
- * Base class for Usergrid Tools commands. Any class that implements this can be called with
- * Base class for Usergrid Tools commands. Any class that implements this can be called with java -jar {jarname}
- * org.apache.usergrid.tools.{classname}.
++ * Base class for Usergrid Tools commands. Any class that implements this can be called with
+ * java -jar {jarname} org.apache.usergrid.tools.{classname}.
*/
public abstract class ToolBase {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/066d7db4/stack/tools/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --cc stack/tools/src/main/resources/log4j.properties
index 00834cf,def47b4..18ebcc4
--- a/stack/tools/src/main/resources/log4j.properties
+++ b/stack/tools/src/main/resources/log4j.properties
@@@ -26,23 -26,8 +26,28 @@@ log4j.appender.stdout=org.apache.log4j.
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) [%c] - %m%n
++<<<<<<< HEAD
+log4j.category.org.apache.usergrid.tools=TRACE
+log4j.category.org.apache.usergrid=ERROR
+
+log4j.logger.org.apache.usergrid.persistence.cassandra.DB=WARN, stdout
+log4j.logger.org.apache.usergrid.persistence.cassandra.BATCH=WARN, stdout
+log4j.logger.org.apache.usergrid.persistence.cassandra.EntityManagerFactoryImpl=WARN, stdout
+log4j.logger.org.apache.usergrid.persistence.cassandra.DaoUtils=WARN, stdout
+log4j.logger.org.apache.usergrid.persistence.cassandra.EntityManagerImpl=WARN, stdout
+log4j.logger.org.apache.usergrid.persistence.cassandra.ConnectionRefImpl=WARN, stdout
+log4j.logger.me.prettyprint.cassandra.hector.TimingLogger=WARN, stdout
+log4j.logger.org.apache.usergrid.rest.security.AllowAjaxFilter=WARN, stdout
+log4j.logger.me.prettyprint.hector.api.beans.AbstractComposite=ERROR, stdout
+#log4j.logger.org.apache.usergrid.locking.singlenode.SingleNodeLockManagerImpl=DEBUG, stdout
+#log4j.logger.org.apache.usergrid.persistence.hector.CountingMutator=INFO, stdout
+
+log4j.logger.org.apache.usergrid.management.cassandra=DEBUB
+log4j.logger.org.apache.usergrid.tools=INFO
++=======
+ log4j.logger.org.apache.usergrid=INFO
+ log4j.logger.org.apache.usergrid.tools=DEBUG
++>>>>>>> master
log4j.logger.org.apache.usergrid.management.cassandra=WARN
log4j.logger.org.apache.usergrid.persistence.cassandra.DB=WARN
[12/18] incubator-usergrid git commit: Merge branch 'master' into
rxportapp
Posted by sn...@apache.org.
Merge branch 'master' into rxportapp
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2748347a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2748347a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2748347a
Branch: refs/heads/two-dot-o-dev
Commit: 2748347a3e3e19b4db3467e429ab9e27f5742892
Parents: b58390d e1b352e
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Jul 20 16:00:55 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Jul 20 16:00:55 2015 -0400
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ExportAdmins.java | 37 ++++++++------------
.../org/apache/usergrid/tools/ImportAdmins.java | 5 +--
2 files changed, 17 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
[08/18] incubator-usergrid git commit: Duplicate user merge.
Posted by sn...@apache.org.
Duplicate user merge.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/16de78d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/16de78d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/16de78d9
Branch: refs/heads/two-dot-o-dev
Commit: 16de78d9d5a42941c76907ffe2986b16c7195976
Parents: e1b352e
Author: Dave Johnson <sn...@apache.org>
Authored: Thu Jul 16 13:56:17 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Thu Jul 16 13:56:17 2015 -0400
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ExportAdmins.java | 2 +
.../org/apache/usergrid/tools/ImportAdmins.java | 203 ++++++++++++++-----
stack/tools/src/main/resources/log4j.properties | 2 +-
.../usergrid/tools/ExportImportAdminsTest.java | 71 ++++---
...adata.usergrid-management.1433331614293.json | 52 +++++
...users.usergrid-management.1433331614293.json | 12 ++
6 files changed, 263 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16de78d9/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
index 2c14da1..ae9c16b 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
@@ -489,6 +489,8 @@ public class ExportAdmins extends ExportingToolBase {
jg.writeObject( orgs.get( uuid ) );
jg.writeEndObject();
+
+ logger.debug( "Exported organization {}:{}", uuid, orgs.get( uuid ) );
}
jg.writeEndArray();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16de78d9/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
index 0b693c8..c6aada7 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
@@ -17,6 +17,7 @@
package org.apache.usergrid.tools;
+import com.sun.org.apache.bcel.internal.generic.DUP;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
@@ -26,6 +27,8 @@ import org.apache.usergrid.management.OrganizationInfo;
import org.apache.usergrid.management.UserInfo;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Identifier;
+import org.apache.usergrid.persistence.SimpleEntityRef;
import org.apache.usergrid.persistence.entities.User;
import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
import org.codehaus.jackson.JsonFactory;
@@ -84,6 +87,7 @@ public class ImportAdmins extends ToolBase {
private Map<Stoppable, Thread> adminAuditThreads = new HashMap<Stoppable, Thread>();
private Map<Stoppable, Thread> metadataWorkerThreadMap = new HashMap<Stoppable, Thread>();
+ Map<UUID, DuplicateUser> dupsByDupUuid = new HashMap<UUID, DuplicateUser>(200);
JsonFactory jsonFactory = new JsonFactory();
@@ -94,6 +98,19 @@ public class ImportAdmins extends ToolBase {
AtomicInteger auditEmptyCount = new AtomicInteger( 0 );
AtomicInteger metadataEmptyCount = new AtomicInteger( 0 );
+
+ static class DuplicateUser {
+ String email;
+ String username;
+ public DuplicateUser( String propName, Map<String, Object> user ) {
+ if ( "email".equals(propName)) {
+ email = user.get("email").toString();
+ } else {
+ username = user.get("username").toString();
+ }
+ }
+ }
+
@Override
@@ -382,7 +399,7 @@ public class ImportAdmins extends ToolBase {
String entityOwnerId = jp.getCurrentName();
try {
- EntityRef entityRef = em.get( UUID.fromString( entityOwnerId ) );
+ EntityRef entityRef = new SimpleEntityRef( "user", UUID.fromString( entityOwnerId ) );
Map<String, Object> metadata = (Map<String, Object>) jp.readValueAs( Map.class );
workQueue.put( new ImportMetadataTask( entityRef, metadata ) );
@@ -408,31 +425,33 @@ public class ImportAdmins extends ToolBase {
private void importEntityMetadata(
EntityManager em, EntityRef entityRef, Map<String, Object> metadata) throws Exception {
- List<Object> organizationsList = (List<Object>) metadata.get("organizations");
- if (organizationsList != null && !organizationsList.isEmpty()) {
+ DuplicateUser dup = dupsByDupUuid.get( entityRef.getUuid() );
+
+ if ( dup == null ) { // not a duplicate
- User user = em.get(entityRef, User.class);
-
- if (user == null) {
- logger.error("User not found, not adding to organizations: "
- + (entityRef == null ? null : entityRef.getUuid()));
+ User user = em.get( entityRef, User.class );
+ final UserInfo userInfo = managementService.getAdminUserByEmail( user.getEmail() );
- } else {
+ if (user == null || userInfo == null) {
+ logger.error( "User {} does not exist, not processing metadata", entityRef.getUuid() );
+ return;
+ }
- final UserInfo userInfo = managementService.getAdminUserByEmail(user.getEmail());
+ List<Object> organizationsList = (List<Object>) metadata.get("organizations");
+ if (organizationsList != null && !organizationsList.isEmpty()) {
for (Object orgObject : organizationsList) {
Map<String, Object> orgMap = (Map<String, Object>) orgObject;
- UUID orgUuid = UUID.fromString((String) orgMap.get("uuid"));
- String orgName = (String) orgMap.get("name");
+ UUID orgUuid = UUID.fromString( (String) orgMap.get( "uuid" ) );
+ String orgName = (String) orgMap.get( "name" );
- // create org only if it does not exist
- OrganizationInfo orgInfo = managementService.getOrganizationByUuid(orgUuid);
- if (orgInfo == null) {
+ OrganizationInfo orgInfo = managementService.getOrganizationByUuid( orgUuid );
+
+ if (orgInfo == null) { // org does not exist yet, create it and add user
try {
- managementService.createOrganization(orgUuid, orgName, userInfo, false);
- orgInfo = managementService.getOrganizationByUuid(orgUuid);
+ managementService.createOrganization( orgUuid, orgName, userInfo, false );
+ orgInfo = managementService.getOrganizationByUuid( orgUuid );
logger.debug( "Created new org {} for user {}",
new Object[]{orgInfo.getName(), user.getEmail()} );
@@ -440,49 +459,107 @@ public class ImportAdmins extends ToolBase {
} catch (DuplicateUniquePropertyExistsException dpee) {
logger.debug( "Org {} already exists", orgName );
}
- } else {
+ } else { // org exists, add original user to it
try {
managementService.addAdminUserToOrganization( userInfo, orgInfo, false );
logger.debug( "Added user {} to org {}", new Object[]{user.getEmail(), orgName} );
-
- } catch ( Exception e ) {
+
+ } catch (Exception e) {
logger.error( "Error Adding user {} to org {}", new Object[]{user.getEmail(), orgName} );
}
}
}
}
+
+ Map<String, Object> dictionariesMap = (Map<String, Object>) metadata.get("dictionaries");
+ if (dictionariesMap != null && !dictionariesMap.isEmpty()) {
+ for (String name : dictionariesMap.keySet()) {
+ try {
+ Map<String, Object> dictionary = (Map<String, Object>) dictionariesMap.get(name);
+ em.addMapToDictionary( entityRef, name, dictionary);
- } else {
- logger.warn("User {} has no organizations", entityRef.getUuid() );
- }
-
- Map<String, Object> dictionariesMap = (Map<String, Object>) metadata.get("dictionaries");
-
- if (dictionariesMap != null && !dictionariesMap.isEmpty()) {
- for (String name : dictionariesMap.keySet()) {
- try {
- Map<String, Object> dictionary = (Map<String, Object>) dictionariesMap.get(name);
- em.addMapToDictionary( entityRef, name, dictionary);
-
- logger.debug( "Creating dictionary for {} name {}",
- new Object[]{entityRef, name} );
+ logger.debug( "Creating dictionary for {} name {}",
+ new Object[]{entityRef, name} );
- } catch (Exception e) {
- if (logger.isDebugEnabled()) {
- logger.error("Error importing dictionary name "
- + name + " for user " + entityRef.getUuid(), e);
- } else {
- logger.error("Error importing dictionary name "
- + name + " for user " + entityRef.getUuid());
+ } catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.error("Error importing dictionary name "
+ + name + " for user " + entityRef.getUuid(), e);
+ } else {
+ logger.error("Error importing dictionary name "
+ + name + " for user " + entityRef.getUuid());
+ }
}
}
+
+ } else {
+ logger.warn("User {} has no dictionaries", entityRef.getUuid() );
}
- } else {
- logger.warn("User {} has no dictionaries", entityRef.getUuid() );
- }
+ } else { // this is a duplicate user, so merge orgs
+
+ logger.info("Processing duplicate username={} email={}", dup.email, dup.username );
+
+ Identifier identifier = dup.email != null ?
+ Identifier.fromEmail( dup.email ) : Identifier.from( dup.username );
+ User originalUser = em.get( em.getUserByIdentifier(identifier), User.class );
+
+ // get map of original user's orgs
+
+ UserInfo originalUserInfo = managementService.getAdminUserByEmail( originalUser.getEmail() );
+ Map<String, Object> originalUserOrgData =
+ managementService.getAdminUserOrganizationData( originalUser.getUuid() );
+ Map<String, Map<String, Object>> originalUserOrgs =
+ (Map<String, Map<String, Object>>) originalUserOrgData.get( "organizations" );
+ // loop through duplicate user's orgs and give orgs to original user
+ List<Object> organizationsList = (List<Object>) metadata.get("organizations");
+ for (Object orgObject : organizationsList) {
+
+ Map<String, Object> orgMap = (Map<String, Object>) orgObject;
+ UUID orgUuid = UUID.fromString( (String) orgMap.get( "uuid" ) );
+ String orgName = (String) orgMap.get( "name" );
+
+ if (originalUserOrgs.get( orgName ) == null) { // original user does not have this org
+
+ OrganizationInfo orgInfo = managementService.getOrganizationByUuid( orgUuid );
+
+ if (orgInfo == null) { // org does not exist yet, create it and add original user to it
+ try {
+ managementService.createOrganization( orgUuid, orgName, originalUserInfo, false );
+ orgInfo = managementService.getOrganizationByUuid( orgUuid );
+
+ logger.debug( "Created new org {} for user {}:{}:{} from duplicate user {}:{}",
+ new Object[]{
+ orgInfo.getName(),
+ originalUser.getUuid(), originalUser.getUsername(), originalUser.getEmail(),
+ dup.username, dup.email
+ });
+
+ } catch (DuplicateUniquePropertyExistsException dpee) {
+ logger.debug( "Org {} already exists", orgName );
+ }
+ } else { // org exists so add original user to it
+ try {
+ managementService.addAdminUserToOrganization( originalUserInfo, orgInfo, false );
+ logger.debug( "Added to org user {}:{}:{} from duplicate user {}:{}",
+ new Object[]{
+ orgInfo.getName(),
+ originalUser.getUuid(), originalUser.getUsername(), originalUser.getEmail(),
+ dup.username, dup.email
+ });
+
+ } catch (Exception e) {
+ logger.error( "Error Adding user {} to org {}",
+ new Object[]{originalUserInfo.getEmail(), orgName} );
+ }
+ }
+
+ } // else original user already has this org
+
+ }
+ }
}
@@ -685,15 +762,15 @@ public class ImportAdmins extends ToolBase {
// Import/create the entity
UUID uuid = getId(entityProps);
- String type = getType(entityProps);
+ String type = getType( entityProps );
try {
long startTime = System.currentTimeMillis();
em.create(uuid, type, entityProps);
- logger.debug( "Imported admin user {} / {}",
- new Object[] { uuid, entityProps.get( "username" ) } );
+ logger.debug( "Imported admin user {}:{}:{}",
+ new Object[] { uuid, entityProps.get( "username" ), entityProps.get("email") } );
userCount.getAndIncrement();
auditQueue.put(entityProps);
@@ -709,19 +786,37 @@ public class ImportAdmins extends ToolBase {
}
} catch (DuplicateUniquePropertyExistsException de) {
- logger.warn("Unable to create admin user {}:{}, duplicate property {}",
- new Object[]{ uuid, entityProps.get("username"), de.getPropertyName() });
- if (logger.isDebugEnabled()) {
- logger.debug("Exception", de);
- }
+ String dupProperty = de.getPropertyName();
+ handleDuplicateAccount( em, dupProperty, entityProps );
+ continue;
+
+
} catch (Exception e) {
- e.printStackTrace();
+ logger.error("Error", e);
}
} catch (InterruptedException e) {
- e.printStackTrace();
+ logger.error( "Error", e );
}
}
}
+
+
+ private void handleDuplicateAccount(EntityManager em, String dupProperty, Map<String, Object> entityProps ) {
+
+ logger.info( "Processing duplicate user {}:{}:{} with duplicate {}", new Object[]{
+ entityProps.get( "uuid" ), entityProps.get( "username" ), entityProps.get( "email" ), dupProperty} );
+
+ UUID dupUuid = UUID.fromString( entityProps.get("uuid").toString() );
+ try {
+ dupsByDupUuid.put( dupUuid, new DuplicateUser( dupProperty, entityProps ) );
+
+ } catch (Exception e) {
+ logger.error("Error processing dup user {}:{}:{}",
+ new Object[] {entityProps.get( "username" ), entityProps.get("email"), dupUuid});
+ return;
+ }
+
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16de78d9/stack/tools/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/resources/log4j.properties b/stack/tools/src/main/resources/log4j.properties
index 6cf0a92..80c32a1 100644
--- a/stack/tools/src/main/resources/log4j.properties
+++ b/stack/tools/src/main/resources/log4j.properties
@@ -26,7 +26,7 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) [%c] - %m%n
-log4j.logger.org.apache.usergrid.tools=INFO
+log4j.logger.org.apache.usergrid.tools=DEBUG
log4j.logger.org.apache.usergrid.management.cassandra=WARN
log4j.logger.org.apache.usergrid.persistence.cassandra.DB=WARN
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16de78d9/stack/tools/src/test/java/org/apache/usergrid/tools/ExportImportAdminsTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportImportAdminsTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportImportAdminsTest.java
index 898a97d..9cce040 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportImportAdminsTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportImportAdminsTest.java
@@ -4,7 +4,7 @@
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at:223
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -26,6 +26,8 @@ import org.apache.usergrid.ServiceITSuite;
import org.apache.usergrid.management.OrganizationInfo;
import org.apache.usergrid.management.OrganizationOwnerInfo;
import org.apache.usergrid.management.UserInfo;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.utils.UUIDUtils;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
@@ -40,13 +42,13 @@ import java.io.FilenameFilter;
import java.util.*;
import static junit.framework.TestCase.assertNotNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.apache.usergrid.persistence.cassandra.CassandraService.MANAGEMENT_APPLICATION_ID;
+import static org.junit.Assert.*;
public class ExportImportAdminsTest {
static final Logger logger = LoggerFactory.getLogger( ExportImportAdminsTest.class );
-
+
@ClassRule
public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
@@ -146,21 +148,28 @@ public class ExportImportAdminsTest {
public void testImportAdminUsersAndOrgs() throws Exception {
// first: generate the data file with unique user and org IDs and names
+
+ // data contains three users each with a unique org, one user has a duplicate email
String rand1 = RandomStringUtils.randomAlphanumeric( 10 );
String rand2 = RandomStringUtils.randomAlphanumeric( 10 );
+ String rand3 = RandomStringUtils.randomAlphanumeric( 10 );
UUID user_uuid_1 = UUIDUtils.newTimeUUID();
UUID user_uuid_2 = UUIDUtils.newTimeUUID();
+ UUID user_uuid_3 = UUIDUtils.newTimeUUID();
UUID org_uuid_1 = UUIDUtils.newTimeUUID();
UUID org_uuid_2 = UUIDUtils.newTimeUUID();
+ UUID org_uuid_3 = UUIDUtils.newTimeUUID();
- String user_name_1 = "user_" + rand1;
- String user_name_2 = "user_" + rand2;
+ String user_name_1 = "user1_" + rand1;
+ String user_name_2 = "user2_" + rand2;
+ String user_name_3 = "user3_" + rand3;
- String org_name_1 = "org_" + rand1;
- String org_name_2 = "org_" + rand2;
+ String org_name_1 = "org1_" + rand1;
+ String org_name_2 = "org2_" + rand2;
+ String org_name_3 = "org3_" + rand3;
// loop through resource files with prefix 'admin-user' those are the data file templates
@@ -179,15 +188,19 @@ public class ExportImportAdminsTest {
fileContent = fileContent.replaceAll( "USER_UUID_1", user_uuid_1.toString() );
fileContent = fileContent.replaceAll( "USER_UUID_2", user_uuid_2.toString() );
+ fileContent = fileContent.replaceAll( "USER_UUID_3", user_uuid_3.toString() );
fileContent = fileContent.replaceAll( "ORG_UUID_1", org_uuid_1.toString() );
fileContent = fileContent.replaceAll( "ORG_UUID_2", org_uuid_2.toString() );
+ fileContent = fileContent.replaceAll( "ORG_UUID_3", org_uuid_3.toString() );
fileContent = fileContent.replaceAll( "USER_NAME_1", user_name_1 );
fileContent = fileContent.replaceAll( "USER_NAME_2", user_name_2 );
+ fileContent = fileContent.replaceAll( "USER_NAME_3", user_name_3 );
fileContent = fileContent.replaceAll( "ORG_NAME_1", org_name_1 );
fileContent = fileContent.replaceAll( "ORG_NAME_2", org_name_2 );
+ fileContent = fileContent.replaceAll( "ORG_NAME_3", org_name_3 );
FileOutputStream os = new FileOutputStream(
tempDir.getAbsolutePath() + File.separator + fileName );
@@ -200,35 +213,45 @@ public class ExportImportAdminsTest {
// import data from temp directory
ImportAdmins importAdmins = new ImportAdmins();
- importAdmins.startTool( new String[] {
- "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
- "-inputDir", tempDir.getAbsolutePath()
+ importAdmins.startTool( new String[]{
+ "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
+ "-inputDir", tempDir.getAbsolutePath()
}, false );
// verify that users and orgs were created correctly
OrganizationInfo orgInfo1 = setup.getMgmtSvc().getOrganizationByUuid( org_uuid_1 );
- assertNotNull( orgInfo1 );
+ assertNotNull( "org 1 exists", orgInfo1 );
+ List<UserInfo> org1_users = setup.getMgmtSvc().getAdminUsersForOrganization( org_uuid_1 );
+ assertEquals("org1 has one user", 1, org1_users.size() );
OrganizationInfo orgInfo2 = setup.getMgmtSvc().getOrganizationByUuid( org_uuid_2 );
- assertNotNull( orgInfo2 );
+ assertNotNull( "org 2 exists", orgInfo2 );
+ List<UserInfo> org2_users = setup.getMgmtSvc().getAdminUsersForOrganization( org_uuid_2 );
+ assertEquals( "org2 has two users", 2, org2_users.size() );
+
+ OrganizationInfo orgInfo3 = setup.getMgmtSvc().getOrganizationByUuid( org_uuid_3 );
+ assertNotNull( "org 3 exists", orgInfo3 );
+ List<UserInfo> org3_users = setup.getMgmtSvc().getAdminUsersForOrganization( org_uuid_3 );
+ assertEquals( "org 3 has 1 users", 1, org3_users.size() );
BiMap<UUID, String> user1_orgs = setup.getMgmtSvc().getOrganizationsForAdminUser( user_uuid_1 );
- assertEquals("user1 has two orgs", 2, user1_orgs.size() );
-
+ assertEquals( "user 1 has 2 orgs", 2, user1_orgs.size() );
+
BiMap<UUID, String> user2_orgs = setup.getMgmtSvc().getOrganizationsForAdminUser( user_uuid_2 );
- assertEquals("user2 has one orgs", 1, user2_orgs.size() );
+ assertEquals( "user 2 has two orgs gained one from duplicate", 2, user2_orgs.size() );
- List<UserInfo> org1_users = setup.getMgmtSvc().getAdminUsersForOrganization( org_uuid_1 );
- assertEquals("org1 has one user", 1, org1_users.size() );
+ try {
+ BiMap<UUID, String> user3_orgs = setup.getMgmtSvc().getOrganizationsForAdminUser( user_uuid_3 );
+ fail("fetch user 3 should have thrown exception");
+ } catch ( Exception expected ) {
+ logger.info("EXCEPTION EXPECTED");
+ }
- List<UserInfo> org2_users = setup.getMgmtSvc().getAdminUsersForOrganization( org_uuid_2 );
- assertEquals("org2 has two users", 2, org2_users.size() );
+ EntityManager em = setup.getEmf().getEntityManager( MANAGEMENT_APPLICATION_ID );
+ Entity user3 = em.get( user_uuid_3 );
+ assertNull( "duplicate user does not exist", user3 );
- UserInfo user1info = setup.getMgmtSvc().getAdminUserByUuid( user_uuid_1 );
- Map<String, Object> user1_data = setup.getMgmtSvc().getAdminUserOrganizationData( user1info, false );
- Map<String, Object> user1_data_orgs = (Map<String, Object>)user1_data.get("organizations");
- assertEquals( 2, user1_data_orgs.size());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16de78d9/stack/tools/src/test/resources/admin-user-metadata.usergrid-management.1433331614293.json
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/resources/admin-user-metadata.usergrid-management.1433331614293.json b/stack/tools/src/test/resources/admin-user-metadata.usergrid-management.1433331614293.json
index 320f8ed..86d7363 100644
--- a/stack/tools/src/test/resources/admin-user-metadata.usergrid-management.1433331614293.json
+++ b/stack/tools/src/test/resources/admin-user-metadata.usergrid-management.1433331614293.json
@@ -81,5 +81,57 @@
}
}
}
+ },
+ "USER_UUID_3" : {
+ "activities": [],
+ "devices": [],
+ "feed": [
+ "4c8fee64-09e5-11e5-b3c6-57cd4e12c0b1"
+ ],
+ "groups": [
+ "4c88c26a-09e5-11e5-8a66-594dd93a503d"
+ ],
+ "roles": [],
+ "connections": {},
+ "organizations": [
+ {
+ "uuid": "ORG_UUID_3",
+ "name": "ORG_NAME_3"
+ }
+ ],
+ "dictionaries": {
+ "credentials": {
+ "mongo_pwd": {
+ "recoverable": true,
+ "encrypted": false,
+ "secret": "e7b4fc7db5b97088997e44eced015d42",
+ "hashType": null,
+ "created": 1433331614067,
+ "cryptoChain": [
+ "plaintext"
+ ]
+ },
+ "password": {
+ "recoverable": false,
+ "encrypted": true,
+ "secret": "JDJhJDA5JER0RTdNSldMRjkxSUlJVm5hZWJMTy5DelFLemwvd2tXdUttaHViZWdyRjRURVdxYk5TUGJt",
+ "hashType": null,
+ "created": 1433331614018,
+ "cryptoChain": [
+ "bcrypt"
+ ]
+ },
+ "secret": {
+ "recoverable": true,
+ "encrypted": false,
+ "secret": "YWQ6Rx9A-m5U-TihpkPVS4PmyQO4qig",
+ "hashType": null,
+ "created": 1433331614067,
+ "cryptoChain": [
+ "plaintext"
+ ]
+ }
+ }
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16de78d9/stack/tools/src/test/resources/admin-users.usergrid-management.1433331614293.json
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/resources/admin-users.usergrid-management.1433331614293.json b/stack/tools/src/test/resources/admin-users.usergrid-management.1433331614293.json
index 5f192bf..3f15986 100644
--- a/stack/tools/src/test/resources/admin-users.usergrid-management.1433331614293.json
+++ b/stack/tools/src/test/resources/admin-users.usergrid-management.1433331614293.json
@@ -20,4 +20,16 @@
"activated" : true,
"confirmed" : true,
"disabled" : false
+}, {
+ "uuid" : "USER_UUID_3",
+ "type" : "user",
+ "name" : "USER_NAME_3",
+ "created" : 1433331614002,
+ "modified" : 1433331614002,
+ "username" : "USER_NAME_3",
+ "comment" : "this is a duplicate user, has same email address as user2",
+ "email" : "USER_NAME_2@example.com",
+ "activated" : true,
+ "confirmed" : true,
+ "disabled" : false
} ]
\ No newline at end of file
[05/18] incubator-usergrid git commit: Less test data and code to
compare 1 read and 1 write thread vs. 100 read and 100 write threads.
Posted by sn...@apache.org.
Less test data and code to compare 1 read and 1 write thread vs. 100 read and 100 write threads.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7b168b91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7b168b91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7b168b91
Branch: refs/heads/two-dot-o-dev
Commit: 7b168b91d99ba51da452a9c7980b0078f733df03
Parents: a25f8eb
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 16:41:06 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 16:41:06 2015 -0400
----------------------------------------------------------------------
.../apache/usergrid/tools/ExportAppTest.java | 24 ++++++++++++++------
1 file changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b168b91/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index d1c5c1f..eeaae13 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -41,9 +41,9 @@ import java.util.concurrent.atomic.AtomicInteger;
public class ExportAppTest {
static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
- int NUM_COLLECTIONS = 20;
- int NUM_ENTITIES = 200;
- int NUM_CONNECTIONS = 5;
+ int NUM_COLLECTIONS = 10;
+ int NUM_ENTITIES = 50;
+ int NUM_CONNECTIONS = 3;
@ClassRule
public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
@@ -113,12 +113,22 @@ public class ExportAppTest {
ExportApp exportApp = new ExportApp();
exportApp.startTool( new String[]{
"-application", appInfo.getName(),
- "-readThreads", "50",
- "-writeThreads", "10",
+ "-readThreads", "100",
+ "-writeThreads", "100",
"-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
"-outputDir", directoryName
}, false );
-
- logger.info("time = " + (System.currentTimeMillis() - start)/1000 + "s");
+
+ logger.info("100 read and 100 write threads = " + (System.currentTimeMillis() - start)/1000 + "s");
+
+ exportApp.startTool( new String[]{
+ "-application", appInfo.getName(),
+ "-readThreads", "1",
+ "-writeThreads", "1",
+ "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
+ "-outputDir", directoryName + "1"
+ }, false );
+
+ logger.info("1 thread time = " + (System.currentTimeMillis() - start)/1000 + "s");
}
}
\ No newline at end of file
[02/18] incubator-usergrid git commit: Add readThread and writeThread
CLI parameters.
Posted by sn...@apache.org.
Add readThread and writeThread CLI parameters.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2b65e619
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2b65e619
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2b65e619
Branch: refs/heads/two-dot-o-dev
Commit: 2b65e619316b206720e574a21fe1b33462e0f2dd
Parents: b2bdbb5
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 08:03:18 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 08:03:18 2015 -0400
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ExportApp.java | 153 +++++++++++++------
1 file changed, 106 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2b65e619/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index ceb3ecd..21c63a0 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.Query;
import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.utils.StringUtils;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.util.MinimalPrettyPrinter;
@@ -46,11 +47,22 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
- * Export application's collections.
+ * Export all entities and connections of a Usergrid app.
+ *
+ * Exports data files to specified directory.
+ *
+ * Will create as many output files as there are writeThreads (by default: 10).
+ *
+ * Will create two types of files: *.uge for Usegrird entities and *.ugc for entity to entity connections.
+ *
+ * Every line of the data files is a complete JSON object.
*/
public class ExportApp extends ExportingToolBase {
static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
+ private static final String READ_THREAD_COUNT = "readThreads";
+ private static final String WRITE_THREAD_COUNT = "writeThreads";
+
// we will write two types of files: entities and connections
BlockingQueue<ExportEntity> entityWriteQueue = new LinkedBlockingQueue<ExportEntity>();
BlockingQueue<ExportConnection> connectionWriteQueue = new LinkedBlockingQueue<ExportConnection>();
@@ -58,10 +70,15 @@ public class ExportApp extends ExportingToolBase {
static final String APPLICATION_NAME = "application";
int pollTimeoutSeconds = 10;
+
+ int readThreadCount = 80;
+ int writeThreadCount = 10;
+
+ String applicationName;
+ String organizationName;
// limiting output threads will limit output files
- final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(8);
- final Scheduler scheduler = Schedulers.from( threadPoolExecutor );
+ Scheduler readScheduler;
Map<Thread, JsonGenerator> entityGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
@@ -77,29 +94,37 @@ public class ExportApp extends ExportingToolBase {
ObjectMapper mapper = new ObjectMapper();
/**
- * Export admin users using multiple threads.
- * <p/>
- * How it works:
- * In main thread we query for IDs of all admin users, add each ID to read queue.
- * Read-queue workers read admin user data, add data to write queue.
- * One write-queue worker reads data writes to file.
+ * Tool entry point.
*/
@Override
public void runTool(CommandLine line) throws Exception {
- String applicationName = line.getOptionValue( APPLICATION_NAME );
-
-// if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
-// try {
-// readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
-// } catch (NumberFormatException nfe) {
-// logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
-// return;
-// }
-// } else {
-// readThreadCount = 20;
-// }
+ applicationName = line.getOptionValue( APPLICATION_NAME );
+
+ if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
+ try {
+ readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
+ } catch (NumberFormatException nfe) {
+ logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+ return;
+ }
+ }
+ if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
+ try {
+ writeThreadCount = Integer.parseInt( line.getOptionValue( WRITE_THREAD_COUNT ) );
+ } catch (NumberFormatException nfe) {
+ logger.error( "-" + WRITE_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+ return;
+ }
+ }
+
+ ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
+ readScheduler = Schedulers.from( readThreadPoolExecutor );
+
+ ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
+ final Scheduler writeScheduler = Schedulers.from( writeThreadPoolExecutor );
+
startSpring();
setVerbose( line );
@@ -111,6 +136,7 @@ public class ExportApp extends ExportingToolBase {
UUID applicationId = emf.lookupApplication( applicationName );
final EntityManager em = emf.getEntityManager( applicationId );
+ organizationName = em.getApplication().getOrganizationName();
// start write queue workers
@@ -119,18 +145,18 @@ public class ExportApp extends ExportingToolBase {
entityWritesObservable.flatMap( new Func1<ExportEntity, Observable<?>>() {
public Observable<ExportEntity> call(ExportEntity exportEntity) {
return Observable.just(exportEntity).doOnNext(
- new EntityWriteAction() ).subscribeOn( scheduler );
+ new EntityWriteAction() ).subscribeOn( writeScheduler );
}
- },10).subscribeOn( scheduler ).subscribe();
+ },10).subscribeOn( writeScheduler ).subscribe();
ConnectionWritesOnSubscribe connectionWritesOnSub = new ConnectionWritesOnSubscribe( connectionWriteQueue );
rx.Observable connectionWritesObservable = rx.Observable.create( connectionWritesOnSub );
connectionWritesObservable.flatMap( new Func1<ExportConnection, Observable<?>>() {
public Observable<ExportConnection> call(ExportConnection connection ) {
return Observable.just(connection).doOnNext(
- new ConnectionWriteAction()).subscribeOn( scheduler );
+ new ConnectionWriteAction()).subscribeOn( writeScheduler );
}
- },10).subscribeOn( scheduler ).subscribe();
+ },10).subscribeOn( writeScheduler ).subscribe();
// start processing data and filling up write queues
@@ -139,9 +165,9 @@ public class ExportApp extends ExportingToolBase {
collectionsObservable.flatMap( new Func1<String, Observable<String>>() {
public Observable<String> call(String collection) {
return Observable.just(collection).doOnNext(
- new CollectionAction( em ) ).subscribeOn( Schedulers.io() );
+ new CollectionAction( em ) ).subscribeOn( readScheduler );
}
- },40).subscribeOn( Schedulers.io() ).subscribe();
+ },40).subscribeOn( readScheduler ).subscribe();
// wait for write thread pollers to get started
@@ -192,14 +218,18 @@ public class ExportApp extends ExportingToolBase {
Options options = super.createOptions();
- Option readThreads = OptionBuilder.hasArg().withType("")
+ Option appNameOption = OptionBuilder.hasArg().withType("")
.withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
- options.addOption( readThreads );
-
-// Option readThreads = OptionBuilder
-// .hasArg().withType(0).withDescription("Read Threads -" + READ_THREAD_COUNT).create(READ_THREAD_COUNT);
-// options.addOption( readThreads );
-
+ options.addOption( appNameOption );
+
+ Option readThreadsOption = OptionBuilder
+ .hasArg().withType(0).withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
+ options.addOption( readThreadsOption );
+
+ Option writeThreadsOption = OptionBuilder
+ .hasArg().withType(0).withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
+ options.addOption( writeThreadsOption );
+
return options;
}
@@ -276,9 +306,13 @@ public class ExportApp extends ExportingToolBase {
}
dictionariesByName.put( dictionary, dict );
}
+
ExportEntity exportEntity = new ExportEntity(
- em.getApplication().getApplicationName(),
- entity, dictionariesByName );
+ organizationName,
+ applicationName,
+ entity,
+ dictionariesByName );
+
subscriber.onNext( exportEntity );
count++;
@@ -333,11 +367,14 @@ public class ExportApp extends ExportingToolBase {
for (Entity connectedEntity : results.getEntities()) {
try {
+
ExportConnection connection = new ExportConnection(
- em.getApplication().getApplicationName(),
+ applicationName,
+ organizationName,
connectionType,
exportEntity.getEntity().getUuid(),
connectedEntity.getUuid());
+
subscriber.onNext( connection );
count++;
@@ -379,9 +416,9 @@ public class ExportApp extends ExportingToolBase {
entityObservable.flatMap( new Func1<ExportEntity, Observable<ExportEntity>>() {
public Observable<ExportEntity> call(ExportEntity exportEntity) {
return Observable.just(exportEntity).doOnNext(
- new EntityAction( em ) ).subscribeOn( Schedulers.io() );
+ new EntityAction( em ) ).subscribeOn( readScheduler );
}
- }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+ }, 8).subscribeOn(readScheduler).toBlocking().last();
}
}
@@ -413,9 +450,9 @@ public class ExportApp extends ExportingToolBase {
entityObservable.flatMap( new Func1<ExportConnection, Observable<ExportConnection>>() {
public Observable<ExportConnection> call(ExportConnection connection) {
return Observable.just(connection).doOnNext(
- new ConnectionsAction() ).subscribeOn( Schedulers.io() );
+ new ConnectionsAction() ).subscribeOn(readScheduler);
}
- }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+ }, 8).subscribeOn(readScheduler).toBlocking().last();
}
} catch (Exception e) {
@@ -472,7 +509,7 @@ public class ExportApp extends ExportingToolBase {
count++;
}
- logger.info("Done. Wrote {} entities", count);
+ logger.info("Done. De-queued {} entities", count);
if ( count > 0 ) {
subscriber.onCompleted();
} else {
@@ -513,7 +550,7 @@ public class ExportApp extends ExportingToolBase {
count++;
}
- logger.info("Done. Wrote {} connections", count);
+ logger.info("Done. De-queued {} connections", count);
if ( count > 0 ) {
subscriber.onCompleted();
} else {
@@ -531,7 +568,8 @@ public class ExportApp extends ExportingToolBase {
boolean wroteData = false;
- String fileName = "target/" + Thread.currentThread().getName() + ".ude";
+ String [] parts = Thread.currentThread().getName().split("-");
+ String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".entities";
JsonGenerator gen = entityGeneratorsByThread.get( Thread.currentThread() );
if ( gen == null ) {
@@ -572,7 +610,8 @@ public class ExportApp extends ExportingToolBase {
boolean wroteData = false;
- String fileName = "target/" + Thread.currentThread().getName() + ".ugc";
+ String [] parts = Thread.currentThread().getName().split("-");
+ String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".connections";
JsonGenerator gen = connectionGeneratorsByThread.get( Thread.currentThread() );
if ( gen == null ) {
@@ -607,10 +646,12 @@ public class ExportApp extends ExportingToolBase {
}
class ExportEntity {
+ private String organization;
private String application;
private Entity entity;
private Map<String, Object> dictionaries;
- public ExportEntity( String application, Entity entity, Map<String, Object> dictionaries ) {
+ public ExportEntity( String organization, String application, Entity entity, Map<String, Object> dictionaries ) {
+ this.organization = organization;
this.application = application;
this.entity = entity;
this.dictionaries = dictionaries;
@@ -639,14 +680,24 @@ class ExportEntity {
public void setDictionaries(Map<String, Object> dictionaries) {
this.dictionaries = dictionaries;
}
+
+ public String getOrganization() {
+ return organization;
+ }
+
+ public void setOrganization(String organization) {
+ this.organization = organization;
+ }
}
class ExportConnection {
+ private String organization;
private String application;
private String connectionType;
private UUID sourceUuid;
private UUID targetUuid;
- public ExportConnection(String application, String connectionType, UUID sourceUuid, UUID targetUuid) {
+ public ExportConnection(String organization, String application, String connectionType, UUID sourceUuid, UUID targetUuid) {
+ this.organization= organization;
this.application = application;
this.connectionType = connectionType;
this.sourceUuid = sourceUuid;
@@ -684,4 +735,12 @@ class ExportConnection {
public void setTargetUuid(UUID targetUuid) {
this.targetUuid = targetUuid;
}
+
+ public String getOrganization() {
+ return organization;
+ }
+
+ public void setOrganization(String organization) {
+ this.organization = organization;
+ }
}
[04/18] incubator-usergrid git commit: Some reformatting. Also
eliminating use of subscriber.unsubscribe(). All observables need to wrap up
with onCompleted().
Posted by sn...@apache.org.
Some reformatting. Also eliminating use of subscriber.unsubscribe(). All observables need to wrap up with onCompleted().
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a25f8ebc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a25f8ebc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a25f8ebc
Branch: refs/heads/two-dot-o-dev
Commit: a25f8ebc2877681897a5ef945d39b685c2d3f9fa
Parents: 7a870d6
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 15:31:07 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 15:31:07 2015 -0400
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ExportApp.java | 113 ++++++++++---------
stack/tools/src/main/resources/log4j.properties | 5 +-
.../apache/usergrid/tools/ExportAppTest.java | 89 ++++-----------
3 files changed, 81 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a25f8ebc/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index 59509c0..c302a74 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -16,7 +16,6 @@
*/
package org.apache.usergrid.tools;
-
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
@@ -82,8 +81,30 @@ public class ExportApp extends ExportingToolBase {
// set via CLI
int readThreadCount = 80;
int writeThreadCount = 10; // limiting write will limit output files
-
+
+ @Override
+ @SuppressWarnings("static-access")
+ public Options createOptions() {
+
+ Options options = super.createOptions();
+
+ Option appNameOption = OptionBuilder.hasArg().withType("")
+ .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
+ options.addOption( appNameOption );
+
+ Option readThreadsOption = OptionBuilder.hasArg().withType(0)
+ .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
+ options.addOption( readThreadsOption );
+
+ Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
+ .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
+ options.addOption( writeThreadsOption );
+
+ return options;
+ }
+
+
/**
* Tool entry point.
*/
@@ -110,25 +131,25 @@ public class ExportApp extends ExportingToolBase {
}
}
- ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
- readScheduler = Schedulers.from( readThreadPoolExecutor );
-
- ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
- writeScheduler = Schedulers.from( writeThreadPoolExecutor );
-
- startSpring();
-
setVerbose( line );
applyOrgId( line );
prepareBaseOutputFileName( line );
outputDir = createOutputParentDir();
logger.info( "Export directory: " + outputDir.getAbsolutePath() );
-
+
+ startSpring();
+
UUID applicationId = emf.lookupApplication( applicationName );
final EntityManager em = emf.getEntityManager( applicationId );
organizationName = em.getApplication().getOrganizationName();
+ ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
+ readScheduler = Schedulers.from( readThreadPoolExecutor );
+
+ ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
+ writeScheduler = Schedulers.from( writeThreadPoolExecutor );
+
Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
@@ -151,30 +172,11 @@ public class ExportApp extends ExportingToolBase {
.toBlocking().last();
}
- @Override
- @SuppressWarnings("static-access")
- public Options createOptions() {
-
- Options options = super.createOptions();
-
- Option appNameOption = OptionBuilder.hasArg().withType("")
- .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
- options.addOption( appNameOption );
-
- Option readThreadsOption = OptionBuilder.hasArg().withType(0)
- .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
- options.addOption( readThreadsOption );
-
- Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
- .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
- options.addOption( writeThreadsOption );
-
- return options;
- }
// ----------------------------------------------------------------------------------------
// reading data
+
/**
* Emits collection names found in application.
*/
@@ -198,16 +200,13 @@ public class ExportApp extends ExportingToolBase {
} catch (Exception e) {
subscriber.onError( e );
}
- if ( count > 0 ) {
- subscriber.onCompleted();
- logger.info( "Completed. Read {} collection names", count );
- } else {
- subscriber.unsubscribe();
- logger.info( "No collections found" );
- }
+
+ subscriber.onCompleted();
+ logger.info( "Completed. Read {} collection names", count );
}
}
+
/**
* Emits entities of collection.
*/
@@ -267,13 +266,8 @@ public class ExportApp extends ExportingToolBase {
results = em.searchCollection( em.getApplicationRef(), collection, query );
}
- if ( count > 0 ) {
- subscriber.onCompleted();
- logger.info("Completed collection {}. Read {} entities", collection, count);
- } else {
- logger.info("Completed collection {} empty", collection );
- subscriber.unsubscribe();
- }
+ subscriber.onCompleted();
+ logger.info("Completed collection {}. Read {} entities", collection, count);
} catch ( Exception e ) {
subscriber.onError(e);
@@ -281,6 +275,7 @@ public class ExportApp extends ExportingToolBase {
}
}
+
/**
* Emits connections of an entity.
*/
@@ -331,19 +326,17 @@ public class ExportApp extends ExportingToolBase {
subscriber.onError( e );
}
- if ( count > 0 ) {
- subscriber.onCompleted();
- logger.info("Completed entity {} type {} connections count {}",
- new Object[] { exportEntity.getEntity().getName(), exportEntity.getEntity().getType(), count });
-
- } else {
- subscriber.unsubscribe();
- logger.info( "Entity {} type {} has no connections",
- exportEntity.getEntity().getName(), exportEntity.getEntity().getType() );
- }
+ subscriber.onCompleted();
+ logger.info("Completed entity {} type {} connections count {}",
+ new Object[] { exportEntity.getEntity().getName(), exportEntity.getEntity().getType(), count });
}
}
+
+ // ----------------------------------------------------------------------------------------
+ // writing data
+
+
/**
* Writes entities to JSON file.
*/
@@ -381,6 +374,7 @@ public class ExportApp extends ExportingToolBase {
}
}
+
/**
* Writes connection to JSON file.
*/
@@ -418,6 +412,7 @@ public class ExportApp extends ExportingToolBase {
}
}
+
private class FileWrapUpAction implements Action0 {
@Override
public void call() {
@@ -448,6 +443,10 @@ public class ExportApp extends ExportingToolBase {
}
}
+
+/**
+ * Represents entity data to be serialized to JSON.
+ */
class ExportEntity {
private String organization;
private String application;
@@ -493,6 +492,10 @@ class ExportEntity {
}
}
+
+/**
+ * Represents connection data to be serialized to JSON.
+ */
class ExportConnection {
private String organization;
private String application;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a25f8ebc/stack/tools/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/resources/log4j.properties b/stack/tools/src/main/resources/log4j.properties
index 6cf0a92..def47b4 100644
--- a/stack/tools/src/main/resources/log4j.properties
+++ b/stack/tools/src/main/resources/log4j.properties
@@ -18,7 +18,7 @@
# and the pattern to %c instead of %l. (%l is slower.)
# output messages into a rolling log file as well as stdout
-log4j.rootLogger=WARN,stdout
+log4j.rootLogger=ERROR,stdout
# stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
@@ -26,7 +26,8 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) [%c] - %m%n
-log4j.logger.org.apache.usergrid.tools=INFO
+log4j.logger.org.apache.usergrid=INFO
+log4j.logger.org.apache.usergrid.tools=DEBUG
log4j.logger.org.apache.usergrid.management.cassandra=WARN
log4j.logger.org.apache.usergrid.persistence.cassandra.DB=WARN
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a25f8ebc/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index af8306f..d1c5c1f 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -27,10 +27,7 @@ import org.apache.usergrid.persistence.EntityManager;
import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import rx.Observable;
import rx.Scheduler;
-import rx.functions.Action1;
-import rx.functions.Func1;
import rx.schedulers.Schedulers;
import java.util.ArrayList;
@@ -44,9 +41,9 @@ import java.util.concurrent.atomic.AtomicInteger;
public class ExportAppTest {
static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
- int NUM_COLLECTIONS = 5;
- int NUM_ENTITIES = 10;
- int NUM_CONNECTIONS = 1;
+ int NUM_COLLECTIONS = 20;
+ int NUM_ENTITIES = 200;
+ int NUM_CONNECTIONS = 5;
@ClassRule
public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
@@ -86,68 +83,25 @@ public class ExportAppTest {
ExecutorService execService = Executors.newFixedThreadPool( 50);
final Scheduler scheduler = Schedulers.from( execService );
- Observable.range( 0, NUM_COLLECTIONS ).flatMap( new Func1<Integer, Observable<?>>() {
- @Override
- public Observable<?> call(Integer i) {
-
- return Observable.just( i ).doOnNext( new Action1<Integer>() {
- @Override
- public void call(Integer i) {
-
- final String type = "thing_"+i;
- try {
- em.createApplicationCollection( type );
- connectionCount.getAndIncrement();
-
- } catch (Exception e) {
- throw new RuntimeException( "Error creating collection", e );
- }
-
- Observable.range( 0, NUM_ENTITIES ).flatMap( new Func1<Integer, Observable<?>>() {
- @Override
- public Observable<?> call(Integer j) {
- return Observable.just( j ).doOnNext( new Action1<Integer>() {
- @Override
- public void call(Integer j) {
-
- final String name = "thing_" + j;
- try {
- final Entity source = em.create(
- type, new HashMap<String, Object>() {{ put("name", name); }});
- entitiesCount.getAndIncrement();
- logger.info( "Created entity {} type {}", name, type );
-
- for ( Entity target : connectedThings ) {
- em.createConnection( source, "has", target );
- connectionCount.getAndIncrement();
- logger.info( "Created connection from entity {} type {} to {}",
- new Object[]{name, type, target.getName()} );
- }
-
-
- } catch (Exception e) {
- throw new RuntimeException( "Error creating collection", e );
- }
-
-
- }
-
- } );
-
- }
- }, 50 ).subscribeOn( scheduler ).subscribe(); // toBlocking().last();
-
- }
- } );
-
+ for (int i = 0; i < NUM_COLLECTIONS; i++) {
- }
- }, 30 ).subscribeOn( scheduler ).toBlocking().last();
+ final String type = "thing_" + i;
+ em.createApplicationCollection( type );
+ connectionCount.getAndIncrement();
+
+ for (int j = 0; j < NUM_ENTITIES; j++) {
+ final String name = "thing_" + j;
+ final Entity source = em.create(
+ type, new HashMap<String, Object>() {{
+ put( "name", name );
+ }} );
+ entitiesCount.getAndIncrement();
- while ( entitiesCount.get() < NUM_COLLECTIONS * NUM_ENTITIES ) {
- Thread.sleep( 5000 );
- logger.info( "Still working. Created {} entities and {} connections",
- entitiesCount.get(), connectionCount.get() );
+ for (Entity target : connectedThings) {
+ em.createConnection( source, "has", target );
+ connectionCount.getAndIncrement();
+ }
+ }
}
logger.info( "Done. Created {} entities and {} connections", entitiesCount.get(), connectionCount.get() );
@@ -166,8 +120,5 @@ public class ExportAppTest {
}, false );
logger.info("time = " + (System.currentTimeMillis() - start)/1000 + "s");
-
-
-
}
}
\ No newline at end of file
[11/18] incubator-usergrid git commit: Don't ignore broken user
accounts (i.e. no creds or orgs), export everything.
Posted by sn...@apache.org.
Don't ignore broken user accounts (i.e. no creds or orgs), export everything.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/897fd508
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/897fd508
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/897fd508
Branch: refs/heads/two-dot-o-dev
Commit: 897fd5083feeab041fd44453abeb2db57efa4614
Parents: 4f8aa2f
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Jul 20 10:37:54 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Jul 20 10:37:54 2015 -0400
----------------------------------------------------------------------
.../src/main/java/org/apache/usergrid/tools/ExportAdmins.java | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/897fd508/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
index 9dce862..d5dd42c 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
@@ -80,6 +80,8 @@ public class ExportAdmins extends ExportingToolBase {
private int readThreadCount;
AtomicInteger userCount = new AtomicInteger( 0 );
+
+ boolean ignoreInvalidUsers = false; // true to ignore users with no credentials or orgs
/**
@@ -335,9 +337,10 @@ public class ExportAdmins extends ExportingToolBase {
String actionTaken = "Processed";
- if (task.orgNamesByUuid.isEmpty()
+ if (ignoreInvalidUsers && (task.orgNamesByUuid.isEmpty()
|| task.dictionariesByName.isEmpty()
- || task.dictionariesByName.get( "credentials" ).isEmpty()) {
+ || task.dictionariesByName.get( "credentials" ).isEmpty())) {
+
actionTaken = "Ignored";
} else {
[09/18] incubator-usergrid git commit: Ignore users with no orgs or
creds, log total orgs exported and better logging.
Posted by sn...@apache.org.
Ignore users with no orgs or creds, log total orgs exported and better logging.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/125ffe98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/125ffe98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/125ffe98
Branch: refs/heads/two-dot-o-dev
Commit: 125ffe9893c617a81b26a9aaaf1f13460fb01ca6
Parents: 16de78d
Author: Dave Johnson <sn...@apache.org>
Authored: Fri Jul 17 11:28:07 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Fri Jul 17 11:28:07 2015 -0400
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ExportAdmins.java | 117 ++++++++++++-------
.../org/apache/usergrid/tools/ImportAdmins.java | 2 +-
2 files changed, 78 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/125ffe98/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
index ae9c16b..9dce862 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
@@ -61,15 +61,25 @@ import static org.apache.usergrid.persistence.cassandra.CassandraService.MANAGEM
* cassandra.lock.keyspace=My_Usergrid_Locks
*/
public class ExportAdmins extends ExportingToolBase {
-
+
static final Logger logger = LoggerFactory.getLogger( ExportAdmins.class );
+
public static final String ADMIN_USERS_PREFIX = "admin-users";
public static final String ADMIN_USER_METADATA_PREFIX = "admin-user-metadata";
+
+ // map admin user UUID to list of organizations to which user belongs
+ private Map<UUID, List<Org>> userToOrgsMap = new HashMap<UUID, List<Org>>(50000);
+
+ private Map<String, UUID> orgNameToUUID = new HashMap<String, UUID>(50000);
+
+ private Set<UUID> orgsWritten = new HashSet<UUID>(50000);
+
+ private Set<UUID> duplicateOrgs = new HashSet<UUID>();
+
private static final String READ_THREAD_COUNT = "readThreads";
- private Map<String, List<Org>> orgMap = new HashMap<String, List<Org>>(80000);
private int readThreadCount;
-
- AtomicInteger count = new AtomicInteger( 0 );
+
+ AtomicInteger userCount = new AtomicInteger( 0 );
/**
@@ -173,7 +183,7 @@ public class ExportAdmins extends ExportingToolBase {
while ( !done ) {
writeThread.join( 10000, 0 );
done = !writeThread.isAlive();
- logger.info( "Wrote {} users", count.get() );
+ logger.info( "Wrote {} users", userCount.get() );
}
}
@@ -211,10 +221,11 @@ public class ExportAdmins extends ExportingToolBase {
organizations = em.searchCollection( em.getApplicationRef(), "groups", query );
for ( Entity organization : organizations.getEntities() ) {
execService.submit( new OrgMapWorker( organization ) );
+ count++;
}
- count++;
+
if ( count % 1000 == 0 ) {
- logger.info("Processed {} orgs for org map", count);
+ logger.info("Queued {} org map workers", count);
}
query.setCursor( organizations.getCursor() );
}
@@ -222,8 +233,10 @@ public class ExportAdmins extends ExportingToolBase {
execService.shutdown();
while ( !execService.awaitTermination( 10, TimeUnit.SECONDS ) ) {
- logger.info("Processed {} orgs for map", orgMap.size() );
+ logger.info( "Processed {} orgs for map", userToOrgsMap.size() );
}
+
+ logger.info("Org map complete, counted {} organizations", count);
}
@@ -239,17 +252,33 @@ public class ExportAdmins extends ExportingToolBase {
try {
final String orgName = orgEntity.getProperty( "path" ).toString();
final UUID orgId = orgEntity.getUuid();
+
for (UserInfo user : managementService.getAdminUsersForOrganization( orgEntity.getUuid() )) {
try {
Entity admin = managementService.getAdminUserEntityByUuid( user.getUuid() );
- List<Org> orgs = orgMap.get( admin.getProperty( "username" ) );
- if (orgs == null) {
- orgs = new ArrayList<Org>();
- orgMap.put( admin.getProperty( "username" ).toString().toLowerCase(), orgs );
+ Org org = new Org( orgId, orgName );
+
+ synchronized (userToOrgsMap) {
+ List<Org> userOrgs = userToOrgsMap.get( admin.getUuid() );
+ if (userOrgs == null) {
+ userOrgs = new ArrayList<Org>();
+ userToOrgsMap.put( admin.getUuid(), userOrgs );
+ }
+ userOrgs.add( org );
}
- orgs.add( new Org( orgId, orgName ) );
- //logger.debug("Added org {} for user {}", orgName, admin.getProperty( "username" ));
+ synchronized (orgNameToUUID) {
+ UUID existingOrgId = orgNameToUUID.get( orgName );
+ ;
+ if (existingOrgId != null && !orgId.equals( existingOrgId )) {
+ if ( !duplicateOrgs.contains( orgId )) {
+ logger.info( "Org {}:{} is a duplicate", orgId, orgName );
+ duplicateOrgs.add(orgId);
+ }
+ } else {
+ orgNameToUUID.put( orgName, orgId );
+ }
+ }
} catch (Exception e) {
logger.warn( "Cannot get orgs for userId {}", user.getUuid() );
@@ -301,10 +330,33 @@ public class ExportAdmins extends ExportingToolBase {
AdminUserWriteTask task = new AdminUserWriteTask();
task.adminUser = entity;
- addDictionariesToTask( task, entity );
+ addDictionariesToTask( task, entity );
addOrganizationsToTask( task );
- writeQueue.add( task );
+ String actionTaken = "Processed";
+
+ if (task.orgNamesByUuid.isEmpty()
+ || task.dictionariesByName.isEmpty()
+ || task.dictionariesByName.get( "credentials" ).isEmpty()) {
+ actionTaken = "Ignored";
+
+ } else {
+ writeQueue.add( task );
+ }
+
+ Map<String, Object> creds = (Map<String, Object>) (task.dictionariesByName.isEmpty() ?
+ 0 : task.dictionariesByName.get( "credentials" ));
+
+ logger.error( "{} admin user {}:{}:{} has organizations={} dictionaries={} credentials={}",
+ new Object[]{
+ actionTaken,
+ task.adminUser.getProperty( "username" ),
+ task.adminUser.getProperty( "email" ),
+ task.adminUser.getUuid(),
+ task.orgNamesByUuid.size(),
+ task.dictionariesByName.size(),
+ creds == null ? 0 : creds.size()
+ } );
} catch ( Exception e ) {
logger.error("Error reading data for user " + uuid, e );
@@ -327,20 +379,8 @@ public class ExportAdmins extends ExportingToolBase {
Map<Object, Object> credentialsDictionary = em.getDictionaryAsMap( entity, "credentials" );
- if ( credentialsDictionary != null && !credentialsDictionary.isEmpty() ) {
+ if ( credentialsDictionary != null ) {
task.dictionariesByName.put( "credentials", credentialsDictionary );
-
- if (credentialsDictionary.get( "password" ) == null) {
- logger.error( "User {}:{} has no password in credential dictionary",
- new Object[]{task.adminUser.getName(), task.adminUser.getUuid()} );
- }
- if (credentialsDictionary.get( "secret" ) == null) {
- logger.error( "User {}:{} has no secret in credential dictionary",
- new Object[]{task.adminUser.getName(), task.adminUser.getUuid()} );
- }
- } else {
- logger.error( "User {}:{} has no or empty credentials dictionary",
- new Object[]{task.adminUser.getName(), task.adminUser.getUuid()} );
}
}
@@ -348,22 +388,17 @@ public class ExportAdmins extends ExportingToolBase {
task.orgNamesByUuid = managementService.getOrganizationsForAdminUser( task.adminUser.getUuid() );
- List<Org> orgs = orgMap.get( task.adminUser.getProperty( "username" ).toString().toLowerCase() );
+ List<Org> orgs = userToOrgsMap.get( task.adminUser.getProperty( "username" ).toString().toLowerCase() );
if ( orgs != null && task.orgNamesByUuid.size() < orgs.size() ) {
+
+ // list of orgs from getOrganizationsForAdminUser() is less than expected, use userToOrgsMap
BiMap<UUID, String> bimap = HashBiMap.create();
for (Org org : orgs) {
bimap.put( org.orgId, org.orgName );
}
task.orgNamesByUuid = bimap;
}
-
- if ( task.orgNamesByUuid.isEmpty() ) {
- logger.error("{}:{}:{} has no orgs", new Object[] {
- task.adminUser.getProperty("username"),
- task.adminUser.getProperty("email"),
- task.adminUser.getUuid() } );
- }
}
}
@@ -425,7 +460,7 @@ public class ExportAdmins extends ExportingToolBase {
task.adminUser.getProperty("email"),
task.adminUser.getUuid() } );
- count.addAndGet( 1 );
+ userCount.addAndGet( 1 );
} catch (InterruptedException e) {
throw new Exception("Interrupted", e);
@@ -438,7 +473,7 @@ public class ExportAdmins extends ExportingToolBase {
usersFile.writeEndArray();
usersFile.close();
- logger.info( "Exported TOTAL {} admin users", count );
+ logger.info( "Exported TOTAL {} admin users and {} organizations", userCount.get(), orgsWritten.size() );
}
@@ -490,7 +525,9 @@ public class ExportAdmins extends ExportingToolBase {
jg.writeEndObject();
- logger.debug( "Exported organization {}:{}", uuid, orgs.get( uuid ) );
+ synchronized (orgsWritten) {
+ orgsWritten.add( uuid );
+ }
}
jg.writeEndArray();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/125ffe98/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
index c6aada7..7e08a4c 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
@@ -636,7 +636,7 @@ public class ImportAdmins extends ToolBase {
long duration = stopTime - startTime;
durationSum += duration;
- //logger.debug( "Audited {}th admin", count );
+ //logger.debug( "Audited {}th admin", userCount );
if ( count % 100 == 0 ) {
logger.info( "Audited {}. Average Audit Rate: {}(ms)", count, durationSum / count );
[06/18] incubator-usergrid git commit: Minor test fixes.
Posted by sn...@apache.org.
Minor test fixes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/dab84e95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/dab84e95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/dab84e95
Branch: refs/heads/two-dot-o-dev
Commit: dab84e95b54ff9ac573cfa291708938b5e181b61
Parents: 7b168b9
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Jul 15 12:34:26 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Jul 15 12:34:26 2015 -0400
----------------------------------------------------------------------
.../apache/usergrid/tools/ExportAppTest.java | 33 ++++++++++++++++++--
1 file changed, 31 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dab84e95/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index eeaae13..c5411fd 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -24,12 +24,15 @@ import org.apache.usergrid.management.ApplicationInfo;
import org.apache.usergrid.management.OrganizationOwnerInfo;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
+import org.junit.Assert;
import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Scheduler;
import rx.schedulers.Schedulers;
+import java.io.File;
+import java.io.FileFilter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -37,7 +40,13 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * TODO: better test, this is really just a smoke test.
+ */
public class ExportAppTest {
static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
@@ -119,8 +128,15 @@ public class ExportAppTest {
"-outputDir", directoryName
}, false );
- logger.info("100 read and 100 write threads = " + (System.currentTimeMillis() - start)/1000 + "s");
+ logger.info( "100 read and 100 write threads = " + (System.currentTimeMillis() - start) / 1000 + "s" );
+
+ File exportDir = new File(directoryName);
+ assertTrue( getFileCount( exportDir, "entities" ) > 0 );
+ assertTrue( getFileCount( exportDir, "collections" ) > 0 );
+ assertTrue( getFileCount( exportDir, "entities" ) >= 100 );
+ assertTrue( getFileCount( exportDir, "collections" ) >= 100 );
+ File exportDir1 = new File(directoryName + "1");
exportApp.startTool( new String[]{
"-application", appInfo.getName(),
"-readThreads", "1",
@@ -129,6 +145,19 @@ public class ExportAppTest {
"-outputDir", directoryName + "1"
}, false );
- logger.info("1 thread time = " + (System.currentTimeMillis() - start)/1000 + "s");
+ logger.info( "1 thread time = " + (System.currentTimeMillis() - start) / 1000 + "s" );
+
+ exportDir = new File(directoryName);
+ assertEquals( 1, getFileCount( exportDir, "entities" ));
+ assertEquals( 1, getFileCount( exportDir, "collections" ));
+ }
+
+ private static int getFileCount(File exportDir, final String ext ) {
+ return exportDir.listFiles( new FileFilter() {
+ @Override
+ public boolean accept(File pathname) {
+ return pathname.getAbsolutePath().endsWith("." + ext);
+ }
+ } ).length;
}
}
\ No newline at end of file
[03/18] incubator-usergrid git commit: Remove queues and make the
whole thing one "stream"
Posted by sn...@apache.org.
Remove queues and make the whole thing one "stream"
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7a870d69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7a870d69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7a870d69
Branch: refs/heads/two-dot-o-dev
Commit: 7a870d6929cea76f5bbec9aa8f5a8caa8dee07e4
Parents: 2b65e61
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 13:31:18 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 13:31:18 2015 -0400
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ExportApp.java | 375 +++++--------------
.../usergrid/tools/ExportingToolBase.java | 2 +-
.../apache/usergrid/tools/ExportAppTest.java | 114 +++++-
3 files changed, 185 insertions(+), 306 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a870d69/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index 21c63a0..59509c0 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
+import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
@@ -42,7 +43,8 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
@@ -59,40 +61,29 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class ExportApp extends ExportingToolBase {
static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
-
- private static final String READ_THREAD_COUNT = "readThreads";
- private static final String WRITE_THREAD_COUNT = "writeThreads";
-
- // we will write two types of files: entities and connections
- BlockingQueue<ExportEntity> entityWriteQueue = new LinkedBlockingQueue<ExportEntity>();
- BlockingQueue<ExportConnection> connectionWriteQueue = new LinkedBlockingQueue<ExportConnection>();
static final String APPLICATION_NAME = "application";
-
- int pollTimeoutSeconds = 10;
-
- int readThreadCount = 80;
- int writeThreadCount = 10;
-
+ private static final String READ_THREAD_COUNT = "readThreads";
+ private static final String WRITE_THREAD_COUNT = "writeThreads";
+
String applicationName;
String organizationName;
- // limiting output threads will limit output files
+ AtomicInteger entitiesWritten = new AtomicInteger(0);
+ AtomicInteger connectionsWritten = new AtomicInteger(0);
+
Scheduler readScheduler;
+ Scheduler writeScheduler;
+ ObjectMapper mapper = new ObjectMapper();
Map<Thread, JsonGenerator> entityGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
- List<String> emptyFiles = new ArrayList<String>();
+ // set via CLI
+ int readThreadCount = 80;
+ int writeThreadCount = 10; // limiting write will limit output files
- AtomicInteger activePollers = new AtomicInteger(0);
- AtomicInteger entitiesQueued = new AtomicInteger(0);
- AtomicInteger entitiesWritten = new AtomicInteger(0);
- AtomicInteger connectionsWritten = new AtomicInteger(0);
- AtomicInteger connectionsQueued = new AtomicInteger(0);
- ObjectMapper mapper = new ObjectMapper();
-
/**
* Tool entry point.
*/
@@ -123,7 +114,7 @@ public class ExportApp extends ExportingToolBase {
readScheduler = Schedulers.from( readThreadPoolExecutor );
ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
- final Scheduler writeScheduler = Schedulers.from( writeThreadPoolExecutor );
+ writeScheduler = Schedulers.from( writeThreadPoolExecutor );
startSpring();
@@ -138,78 +129,26 @@ public class ExportApp extends ExportingToolBase {
final EntityManager em = emf.getEntityManager( applicationId );
organizationName = em.getApplication().getOrganizationName();
- // start write queue workers
-
- EntityWritesOnSubscribe entityWritesOnSub = new EntityWritesOnSubscribe( entityWriteQueue );
- rx.Observable entityWritesObservable = rx.Observable.create( entityWritesOnSub );
- entityWritesObservable.flatMap( new Func1<ExportEntity, Observable<?>>() {
- public Observable<ExportEntity> call(ExportEntity exportEntity) {
- return Observable.just(exportEntity).doOnNext(
- new EntityWriteAction() ).subscribeOn( writeScheduler );
- }
- },10).subscribeOn( writeScheduler ).subscribe();
-
- ConnectionWritesOnSubscribe connectionWritesOnSub = new ConnectionWritesOnSubscribe( connectionWriteQueue );
- rx.Observable connectionWritesObservable = rx.Observable.create( connectionWritesOnSub );
- connectionWritesObservable.flatMap( new Func1<ExportConnection, Observable<?>>() {
- public Observable<ExportConnection> call(ExportConnection connection ) {
- return Observable.just(connection).doOnNext(
- new ConnectionWriteAction()).subscribeOn( writeScheduler );
+ Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
+
+ collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
+
+ public Observable<ExportEntity> call(String collection) {
+ return Observable.create( new EntityObservable( em, collection ))
+ .doOnNext( new EntityWriteAction() ).subscribeOn( writeScheduler );
}
- },10).subscribeOn( writeScheduler ).subscribe();
-
- // start processing data and filling up write queues
-
- CollectionsOnSubscribe onSubscribe = new CollectionsOnSubscribe( em );
- rx.Observable collectionsObservable = rx.Observable.create( onSubscribe );
- collectionsObservable.flatMap( new Func1<String, Observable<String>>() {
- public Observable<String> call(String collection) {
- return Observable.just(collection).doOnNext(
- new CollectionAction( em ) ).subscribeOn( readScheduler );
+
+ }, 10).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
+
+ public Observable<ExportConnection> call(ExportEntity exportEntity) {
+ return Observable.create( new ConnectionsObservable( em, exportEntity ))
+ .doOnNext( new ConnectionWriteAction() ).subscribeOn( writeScheduler );
}
- },40).subscribeOn( readScheduler ).subscribe();
-
- // wait for write thread pollers to get started
-
- try { Thread.sleep( 1000 ); } catch (InterruptedException ignored) {}
-
- // wait for write-thread pollers to stop
-
- while ( activePollers.get() > 0 ) {
- logger.info(
- "Active write threads: {}\n"
- +"Entities written: {}\n"
- +"Entities queued: {}\n"
- +"Connections written: {}\n"
- +"Connections queued: {}\n",
- new Object[] {
- activePollers.get(),
- entitiesWritten.get(),
- entitiesQueued.get(),
- connectionsWritten.get(),
- connectionsQueued.get()} );
- try { Thread.sleep( 5000 ); } catch (InterruptedException ignored) {}
- }
-
- // wrap up files
-
- for ( JsonGenerator gen : entityGeneratorsByThread.values() ) {
- //gen.writeEndArray();
- gen.flush();
- gen.close();
- }
-
- for ( JsonGenerator gen : connectionGeneratorsByThread.values() ) {
- //gen.writeEndArray();
- gen.flush();
- gen.close();
- }
-
- for ( String fileName : emptyFiles ) {
- File emptyFile = new File(fileName);
- emptyFile.deleteOnExit();
- }
-
+
+ }, 10)
+ .subscribeOn( readScheduler )
+ .doOnCompleted( new FileWrapUpAction() )
+ .toBlocking().last();
}
@Override
@@ -222,12 +161,12 @@ public class ExportApp extends ExportingToolBase {
.withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
options.addOption( appNameOption );
- Option readThreadsOption = OptionBuilder
- .hasArg().withType(0).withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
+ Option readThreadsOption = OptionBuilder.hasArg().withType(0)
+ .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
options.addOption( readThreadsOption );
- Option writeThreadsOption = OptionBuilder
- .hasArg().withType(0).withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
+ Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
+ .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
options.addOption( writeThreadsOption );
return options;
@@ -239,17 +178,15 @@ public class ExportApp extends ExportingToolBase {
/**
* Emits collection names found in application.
*/
- class CollectionsOnSubscribe implements rx.Observable.OnSubscribe<String> {
+ class CollectionsObservable implements rx.Observable.OnSubscribe<String> {
EntityManager em;
- public CollectionsOnSubscribe( EntityManager em ) {
+ public CollectionsObservable(EntityManager em) {
this.em = em;
}
public void call(Subscriber<? super String> subscriber) {
- logger.info("Starting to read collections");
-
int count = 0;
try {
Map<String, Object> collectionMetadata = em.getApplicationCollectionMetadata();
@@ -261,11 +198,12 @@ public class ExportApp extends ExportingToolBase {
} catch (Exception e) {
subscriber.onError( e );
}
- logger.info("Done. Read {} collection names", count);
if ( count > 0 ) {
subscriber.onCompleted();
+ logger.info( "Completed. Read {} collection names", count );
} else {
subscriber.unsubscribe();
+ logger.info( "No collections found" );
}
}
}
@@ -273,11 +211,11 @@ public class ExportApp extends ExportingToolBase {
/**
* Emits entities of collection.
*/
- class EntityOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
+ class EntityObservable implements rx.Observable.OnSubscribe<ExportEntity> {
EntityManager em;
String collection;
- public EntityOnSubscribe(EntityManager em, String collection) {
+ public EntityObservable(EntityManager em, String collection) {
this.em = em;
this.collection = collection;
}
@@ -286,6 +224,8 @@ public class ExportApp extends ExportingToolBase {
logger.info("Starting to read entities of collection {}", collection);
+ subscriber.onStart();
+
try {
int count = 0;
@@ -327,10 +267,11 @@ public class ExportApp extends ExportingToolBase {
results = em.searchCollection( em.getApplicationRef(), collection, query );
}
- logger.info("Done. Read {} entities", count);
if ( count > 0 ) {
subscriber.onCompleted();
+ logger.info("Completed collection {}. Read {} entities", collection, count);
} else {
+ logger.info("Completed collection {} empty", collection );
subscriber.unsubscribe();
}
@@ -343,18 +284,19 @@ public class ExportApp extends ExportingToolBase {
/**
* Emits connections of an entity.
*/
- class ConnectionsOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
+ class ConnectionsObservable implements rx.Observable.OnSubscribe<ExportConnection> {
EntityManager em;
ExportEntity exportEntity;
- public ConnectionsOnSubscribe(EntityManager em, ExportEntity exportEntity) {
+ public ConnectionsObservable(EntityManager em, ExportEntity exportEntity) {
this.em = em;
this.exportEntity = exportEntity;
}
public void call(Subscriber<? super ExportConnection> subscriber) {
- logger.info("Starting to read connections for entity type {}", exportEntity.getEntity().getType());
+ logger.info( "Starting to read connections for entity {} type {}",
+ exportEntity.getEntity().getName(), exportEntity.getEntity().getType() );
int count = 0;
@@ -388,173 +330,16 @@ public class ExportApp extends ExportingToolBase {
} catch (Exception e) {
subscriber.onError( e );
}
-
- logger.info("Done. Read {} connections", count);
- if ( count > 0 ) {
- subscriber.onCompleted();
- } else {
- subscriber.unsubscribe();
- }
- }
- }
-
- /**
- * Process collection by starting processing of its entities.
- */
- class CollectionAction implements Action1<String> {
- EntityManager em;
-
- public CollectionAction( EntityManager em ) {
- this.em = em;
- }
-
- public void call(String collection) {
-
- // process entities of collection in parallel
- EntityOnSubscribe onSubscribe = new EntityOnSubscribe( em, collection );
- rx.Observable entityObservable = rx.Observable.create( onSubscribe );
- entityObservable.flatMap( new Func1<ExportEntity, Observable<ExportEntity>>() {
- public Observable<ExportEntity> call(ExportEntity exportEntity) {
- return Observable.just(exportEntity).doOnNext(
- new EntityAction( em ) ).subscribeOn( readScheduler );
- }
- }, 8).subscribeOn(readScheduler).toBlocking().last();
- }
- }
-
- /**
- * Process entity by adding it to entityWriteQueue and starting processing of its connections.
- */
- class EntityAction implements Action1<ExportEntity> {
- EntityManager em;
-
- public EntityAction( EntityManager em ) {
- this.em = em;
- }
-
- public void call(ExportEntity exportEntity) {
- //logger.debug( "Processing entity: " + exportEntity.getEntity().getUuid() );
-
- entityWriteQueue.add( exportEntity );
- entitiesQueued.getAndIncrement();
-
- // if entity has connections, process them in parallel
- try {
- Results connectedEntities = em.getConnectedEntities(
- exportEntity.getEntity().getUuid(), null, null, Results.Level.CORE_PROPERTIES );
-
- if ( !connectedEntities.isEmpty() ) {
- ConnectionsOnSubscribe onSubscribe = new ConnectionsOnSubscribe( em, exportEntity );
- rx.Observable entityObservable = rx.Observable.create( onSubscribe );
-
- entityObservable.flatMap( new Func1<ExportConnection, Observable<ExportConnection>>() {
- public Observable<ExportConnection> call(ExportConnection connection) {
- return Observable.just(connection).doOnNext(
- new ConnectionsAction() ).subscribeOn(readScheduler);
- }
- }, 8).subscribeOn(readScheduler).toBlocking().last();
- }
-
- } catch (Exception e) {
- throw new RuntimeException( "Error getting connections", e );
- }
- }
- }
-
- /**
- * Process connection by adding it to connectionWriteQueue.
- */
- class ConnectionsAction implements Action1<ExportConnection> {
-
- public void call(ExportConnection conn) {
- //logger.debug( "Processing connections for entity: " + conn.getSourceUuid() );
- connectionWriteQueue.add(conn);
- connectionsQueued.getAndIncrement();
- }
- }
-
-
- // ----------------------------------------------------------------------------------------
- // writing data
-
- /**
- * Emits entities to be written.
- */
- class EntityWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
- BlockingQueue<ExportEntity> queue;
-
- public EntityWritesOnSubscribe( BlockingQueue<ExportEntity> queue ) {
- this.queue = queue;
- }
-
- public void call(Subscriber<? super ExportEntity> subscriber) {
- int count = 0;
-
- while ( true ) {
- ExportEntity entity = null;
- try {
- //logger.debug( "Wrote {}. Polling for entity to write...", count );
- activePollers.getAndIncrement();
- entity = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
- } catch (InterruptedException e) {
- logger.error("Entity poll interrupted", e);
- continue;
- } finally {
- activePollers.getAndDecrement();
- }
- if ( entity == null ) {
- break;
- }
- subscriber.onNext( entity );
- count++;
- }
-
- logger.info("Done. De-queued {} entities", count);
- if ( count > 0 ) {
- subscriber.onCompleted();
- } else {
- subscriber.unsubscribe();
- }
- }
- }
-
- /**
- * Emits connections to be written.
- */
- class ConnectionWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
- BlockingQueue<ExportConnection> queue;
-
- public ConnectionWritesOnSubscribe( BlockingQueue<ExportConnection> queue ) {
- this.queue = queue;
- }
-
- public void call(Subscriber<? super ExportConnection> subscriber) {
- int count = 0;
- while ( true ) {
- ExportConnection connection = null;
- try {
- //logger.debug( "Wrote {}. Polling for connection to write", count );
- activePollers.getAndIncrement();
- connection = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
- } catch (InterruptedException e) {
- logger.error("Connection poll interrupted", e);
- continue;
- } finally {
- activePollers.getAndDecrement();
- }
- if ( connection == null ) {
- break;
- }
- subscriber.onNext( connection );
- count++;
- }
-
- logger.info("Done. De-queued {} connections", count);
if ( count > 0 ) {
subscriber.onCompleted();
+ logger.info("Completed entity {} type {} connections count {}",
+ new Object[] { exportEntity.getEntity().getName(), exportEntity.getEntity().getType(), count });
+
} else {
subscriber.unsubscribe();
+ logger.info( "Entity {} type {} has no connections",
+ exportEntity.getEntity().getName(), exportEntity.getEntity().getType() );
}
}
}
@@ -566,10 +351,9 @@ public class ExportApp extends ExportingToolBase {
public void call(ExportEntity entity) {
- boolean wroteData = false;
-
String [] parts = Thread.currentThread().getName().split("-");
- String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".entities";
+ String fileName = outputDir.getAbsolutePath() + File.separator
+ + applicationName.replace('/','-') + "-" + parts[3] + ".entities";
JsonGenerator gen = entityGeneratorsByThread.get( Thread.currentThread() );
if ( gen == null ) {
@@ -577,6 +361,7 @@ public class ExportApp extends ExportingToolBase {
// no generator so we are opening new file and writing the start of an array
try {
gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+ logger.info("Opened output file {}", fileName);
} catch (IOException e) {
throw new RuntimeException("Error opening output file: " + fileName, e);
}
@@ -589,15 +374,10 @@ public class ExportApp extends ExportingToolBase {
gen.writeObject( entity );
gen.writeRaw('\n');
entitiesWritten.getAndIncrement();
- wroteData = true;
} catch (IOException e) {
throw new RuntimeException("Error writing to output file: " + fileName, e);
}
-
- if ( !wroteData ) {
- emptyFiles.add( fileName );
- }
}
}
@@ -608,10 +388,9 @@ public class ExportApp extends ExportingToolBase {
public void call(ExportConnection conn) {
- boolean wroteData = false;
-
String [] parts = Thread.currentThread().getName().split("-");
- String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".connections";
+ String fileName = outputDir.getAbsolutePath() + File.separator
+ + applicationName.replace('/','-') + "-" + parts[3] + ".connections";
JsonGenerator gen = connectionGeneratorsByThread.get( Thread.currentThread() );
if ( gen == null ) {
@@ -619,6 +398,7 @@ public class ExportApp extends ExportingToolBase {
// no generator so we are opening new file and writing the start of an array
try {
gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+ logger.info("Opened output file {}", fileName);
} catch (IOException e) {
throw new RuntimeException("Error opening output file: " + fileName, e);
}
@@ -631,18 +411,41 @@ public class ExportApp extends ExportingToolBase {
gen.writeObject( conn );
gen.writeRaw('\n');
connectionsWritten.getAndIncrement();
- wroteData = true;
} catch (IOException e) {
throw new RuntimeException("Error writing to output file: " + fileName, e);
}
+ }
+ }
+
+ private class FileWrapUpAction implements Action0 {
+ @Override
+ public void call() {
+
+ logger.info("-------------------------------------------------------------------");
+ logger.info("DONE! Entities: {} Connections: {}", entitiesWritten.get(), connectionsWritten.get());
+ logger.info("-------------------------------------------------------------------");
- if ( !wroteData ) {
- emptyFiles.add( fileName );
+ for ( JsonGenerator gen : entityGeneratorsByThread.values() ) {
+ try {
+ //gen.writeEndArray();
+ gen.flush();
+ gen.close();
+ } catch (IOException e) {
+ logger.error("Error closing output file", e);
+ }
+ }
+ for ( JsonGenerator gen : connectionGeneratorsByThread.values() ) {
+ try {
+ //gen.writeEndArray();
+ gen.flush();
+ gen.close();
+ } catch (IOException e) {
+ logger.error("Error closing output file", e);
+ }
}
}
}
-
}
class ExportEntity {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a870d69/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
index 3de220c..a7c3905 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
@@ -138,7 +138,7 @@ public abstract class ExportingToolBase extends ToolBase {
if ( !file.mkdirs() ) {
- throw new RuntimeException( String.format( "Unable to create diretory %s", dirName ) );
+ throw new RuntimeException( String.format( "Unable to create directory %s", dirName ) );
}
return file;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a870d69/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index 14b9311..af8306f 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -27,14 +27,26 @@ import org.apache.usergrid.persistence.EntityManager;
import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
public class ExportAppTest {
static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
+
+ int NUM_COLLECTIONS = 5;
+ int NUM_ENTITIES = 10;
+ int NUM_CONNECTIONS = 1;
@ClassRule
public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
@@ -52,46 +64,110 @@ public class ExportAppTest {
ApplicationInfo appInfo = setup.getMgmtSvc().createApplication(
orgInfo.getOrganization().getUuid(), "app_" + rand );
- EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
+ final EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
- // create 10 connected things
+ // create connected things
- List<Entity> connectedThings = new ArrayList<Entity>();
+ final List<Entity> connectedThings = new ArrayList<Entity>();
String connectedType = "connected_thing";
em.createApplicationCollection(connectedType);
- for ( int j=0; j<10; j++) {
+ for ( int j=0; j<NUM_CONNECTIONS; j++) {
final String name = "connected_thing_" + j;
connectedThings.add( em.create( connectedType, new HashMap<String, Object>() {{
put( "name", name );
}} ) );
}
- // create 10 collections of 10 things, every other thing is connected to the connected things
-
- for ( int i=0; i<10; i++) {
- String type = "thing_"+i;
- em.createApplicationCollection(type);
- for ( int j=0; j<10; j++) {
- final String name = "thing_" + j;
- Entity source = em.create(type, new HashMap<String, Object>() {{ put("name", name); }});
- if ( j % 2 == 0 ) {
- for ( Entity target : connectedThings ) {
- em.createConnection( source, "has", target );
+ // create collections of things, every other thing is connected to the connected things
+
+ final AtomicInteger entitiesCount = new AtomicInteger(0);
+ final AtomicInteger connectionCount = new AtomicInteger(0);
+
+ ExecutorService execService = Executors.newFixedThreadPool( 50);
+ final Scheduler scheduler = Schedulers.from( execService );
+
+ Observable.range( 0, NUM_COLLECTIONS ).flatMap( new Func1<Integer, Observable<?>>() {
+ @Override
+ public Observable<?> call(Integer i) {
+
+ return Observable.just( i ).doOnNext( new Action1<Integer>() {
+ @Override
+ public void call(Integer i) {
+
+ final String type = "thing_"+i;
+ try {
+ em.createApplicationCollection( type );
+ connectionCount.getAndIncrement();
+
+ } catch (Exception e) {
+ throw new RuntimeException( "Error creating collection", e );
+ }
+
+ Observable.range( 0, NUM_ENTITIES ).flatMap( new Func1<Integer, Observable<?>>() {
+ @Override
+ public Observable<?> call(Integer j) {
+ return Observable.just( j ).doOnNext( new Action1<Integer>() {
+ @Override
+ public void call(Integer j) {
+
+ final String name = "thing_" + j;
+ try {
+ final Entity source = em.create(
+ type, new HashMap<String, Object>() {{ put("name", name); }});
+ entitiesCount.getAndIncrement();
+ logger.info( "Created entity {} type {}", name, type );
+
+ for ( Entity target : connectedThings ) {
+ em.createConnection( source, "has", target );
+ connectionCount.getAndIncrement();
+ logger.info( "Created connection from entity {} type {} to {}",
+ new Object[]{name, type, target.getName()} );
+ }
+
+
+ } catch (Exception e) {
+ throw new RuntimeException( "Error creating collection", e );
+ }
+
+
+ }
+
+ } );
+
+ }
+ }, 50 ).subscribeOn( scheduler ).subscribe(); // toBlocking().last();
+
}
- }
+ } );
+
+
}
+ }, 30 ).subscribeOn( scheduler ).toBlocking().last();
+
+ while ( entitiesCount.get() < NUM_COLLECTIONS * NUM_ENTITIES ) {
+ Thread.sleep( 5000 );
+ logger.info( "Still working. Created {} entities and {} connections",
+ entitiesCount.get(), connectionCount.get() );
}
-
- // export to file
- String directoryName = "./target/export" + rand;
+ logger.info( "Done. Created {} entities and {} connections", entitiesCount.get(), connectionCount.get() );
+
+ long start = System.currentTimeMillis();
+
+ String directoryName = "target/export" + rand;
ExportApp exportApp = new ExportApp();
exportApp.startTool( new String[]{
"-application", appInfo.getName(),
+ "-readThreads", "50",
+ "-writeThreads", "10",
"-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
"-outputDir", directoryName
}, false );
+ logger.info("time = " + (System.currentTimeMillis() - start)/1000 + "s");
+
+
+
}
}
\ No newline at end of file
[17/18] incubator-usergrid git commit: Merge branch 'two-dot-o-dev'
of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into
two-dot-o-dev
Posted by sn...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-dev
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a562366e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a562366e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a562366e
Branch: refs/heads/two-dot-o-dev
Commit: a562366e030dbc6017032dd4af82354e64d48ffc
Parents: fedf165 ca4575d
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Jul 22 11:17:10 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Jul 22 11:17:10 2015 -0400
----------------------------------------------------------------------
.../usergrid/services/AbstractService.java | 270 ++++++++++---------
1 file changed, 144 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
[16/18] incubator-usergrid git commit: Merge branch 'two-dot-o' into
two-dot-o-dev
Posted by sn...@apache.org.
Merge branch 'two-dot-o' into two-dot-o-dev
Conflicts:
stack/core/pom.xml
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/fedf1651
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/fedf1651
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/fedf1651
Branch: refs/heads/two-dot-o-dev
Commit: fedf16516b1f5eb26281b8d9cf6ed3b260c715f5
Parents: d32cee4 066d7db
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Jul 22 09:46:21 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Jul 22 09:46:21 2015 -0400
----------------------------------------------------------------------
stack/pom.xml | 2 +-
stack/tools/pom.xml | 6 +
.../org/apache/usergrid/tools/ExportAdmins.java | 117 ++--
.../org/apache/usergrid/tools/ExportApp.java | 536 +++++++++++++++++++
.../usergrid/tools/ExportDataCreator.java | 244 +++++++--
.../usergrid/tools/ExportingToolBase.java | 2 +-
.../org/apache/usergrid/tools/ImportAdmins.java | 226 ++++++--
stack/tools/src/main/resources/log4j.properties | 5 +
.../apache/usergrid/tools/ExportAppTest.java | 118 ++++
.../usergrid/tools/ExportImportAdminsTest.java | 71 ++-
...adata.usergrid-management.1433331614293.json | 52 ++
...users.usergrid-management.1433331614293.json | 12 +
12 files changed, 1224 insertions(+), 167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fedf1651/stack/pom.xml
----------------------------------------------------------------------
diff --cc stack/pom.xml
index d7f279d,cc39e04..6f984e3
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@@ -109,10 -109,10 +109,10 @@@
<jersey-version>1.18.1</jersey-version>
<junit-version>4.12</junit-version>
<log4j-version>1.2.16</log4j-version>
- <org.springframework.version>3.1.2.RELEASE</org.springframework.version>
+ <org.springframework.version>3.2.13.RELEASE</org.springframework.version>
<shiro-version>1.2.3</shiro-version>
<slf4j-version>1.6.1</slf4j-version>
- <snakeyaml-version>1.8</snakeyaml-version>
+ <snakeyaml-version>1.9</snakeyaml-version>
<tomcat-version>7.0.59</tomcat-version>
<antlr.version>3.4</antlr.version>
<tika.version>1.4</tika.version>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fedf1651/stack/tools/src/main/resources/log4j.properties
----------------------------------------------------------------------
[13/18] incubator-usergrid git commit: Added data creator to generate
data for ExportApp testing.
Posted by sn...@apache.org.
Added data creator to generate data for ExportApp testing.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a6e68a0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a6e68a0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a6e68a0a
Branch: refs/heads/two-dot-o-dev
Commit: a6e68a0adc571b0a30138d4cb71b5e0e49a42c0a
Parents: 2748347
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 21 14:59:59 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 21 14:59:59 2015 -0400
----------------------------------------------------------------------
stack/pom.xml | 2 +-
stack/tools/pom.xml | 6 +
.../org/apache/usergrid/tools/ExportApp.java | 17 +-
.../usergrid/tools/ExportDataCreator.java | 244 +++++++++++++++----
.../org/apache/usergrid/tools/ToolBase.java | 2 +-
.../apache/usergrid/tools/ExportAppTest.java | 65 +----
6 files changed, 223 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index da1b62c..0e1b32d 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -103,7 +103,7 @@
<org.springframework.version>3.1.2.RELEASE</org.springframework.version>
<shiro-version>1.2.0</shiro-version>
<slf4j-version>1.6.1</slf4j-version>
- <snakeyaml-version>1.8</snakeyaml-version>
+ <snakeyaml-version>1.9</snakeyaml-version>
<tomcat-version>7.0.42</tomcat-version>
<antlr.version>3.4</antlr.version>
<tika.version>1.4</tika.version>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/tools/pom.xml
----------------------------------------------------------------------
diff --git a/stack/tools/pom.xml b/stack/tools/pom.xml
index 4be3232..3402612 100644
--- a/stack/tools/pom.xml
+++ b/stack/tools/pom.xml
@@ -249,5 +249,11 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.codearte.jfairy</groupId>
+ <artifactId>jfairy</artifactId>
+ <version>0.4.3</version>
+ </dependency>
+
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index b2da0ea..db975e6 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -102,7 +102,7 @@ public class ExportApp extends ExportingToolBase {
*/
@Override
public void runTool(CommandLine line) throws Exception {
-
+
applicationName = line.getOptionValue( APPLICATION_NAME );
if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
@@ -122,8 +122,11 @@ public class ExportApp extends ExportingToolBase {
logger.info( "Export directory: " + outputDir.getAbsolutePath() );
startSpring();
-
+
UUID applicationId = emf.lookupApplication( applicationName );
+ if (applicationId == null) {
+ throw new RuntimeException( "Cannot find application " + applicationName );
+ }
final EntityManager em = emf.getEntityManager( applicationId );
organizationName = em.getApplication().getOrganizationName();
@@ -133,8 +136,9 @@ public class ExportApp extends ExportingToolBase {
Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
-
+
public Observable<ExportEntity> call(String collection) {
+
return Observable.create( new EntityObservable( em, collection ) )
.doOnNext( new EntityWriteAction() ).subscribeOn( writeScheduler );
}
@@ -142,16 +146,17 @@ public class ExportApp extends ExportingToolBase {
}, writeThreadCount ).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
public Observable<ExportConnection> call(ExportEntity exportEntity) {
+
return Observable.create( new ConnectionsObservable( em, exportEntity ) )
.doOnNext( new ConnectionWriteAction() ).subscribeOn( writeScheduler );
}
- }, writeThreadCount)
+ }, writeThreadCount )
.doOnCompleted( new FileWrapUpAction() )
.toBlocking().last();
}
-
-
+
+
// ----------------------------------------------------------------------------------------
// reading data
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportDataCreator.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportDataCreator.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportDataCreator.java
index a3d6517..6fa4896 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportDataCreator.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportDataCreator.java
@@ -17,90 +17,232 @@
package org.apache.usergrid.tools;
-import java.util.UUID;
-
-import org.apache.usergrid.management.ManagementService;
+import io.codearte.jfairy.Fairy;
+import io.codearte.jfairy.producer.company.Company;
+import io.codearte.jfairy.producer.person.Person;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.management.ApplicationInfo;
+import org.apache.usergrid.management.OrganizationInfo;
import org.apache.usergrid.management.OrganizationOwnerInfo;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.EntityRef;
import org.apache.usergrid.persistence.entities.Activity;
-import org.apache.usergrid.persistence.entities.User;
+import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
-import static org.junit.Assert.assertNotNull;
+import java.util.*;
/**
- * Simple class to create test for for exporting
- *
- * @author tnine
+ * Create an app full of users and data.
*/
-public class ExportDataCreator {
+public class ExportDataCreator extends ToolBase {
+
+ public static final String APP_NAME = "application";
+ public static final String ORG_NAME = "organization";
+ public static final String NUM_USERS = "users";
+ public static final String NUM_COLLECTIONS = "collections";
+ public static final String NUM_ENTITIES = "entities";
+ public static final String ADMIN_USERNAME = "username";
+ public static final String ADMIN_PASSWORD = "password";
- private EntityManagerFactory emf;
+ public String appName = "test-app";
+ public String orgName = "test-organization";
+ public int numUsers = 100;
+ public int numCollections = 20;
+ public int numEntities = 100;
+ public String adminUsername = "adminuser";
+ public String adminPassword = "test";
- private ManagementService managementService;
+ @Override
+ public void runTool(CommandLine line) throws Exception {
- /**
- * @param emf
- * @param managementService
- */
- public ExportDataCreator( EntityManagerFactory emf, ManagementService managementService ) {
- super();
- this.emf = emf;
- this.managementService = managementService;
+ startSpring();
+
+ setVerbose( line );
+
+ if (line.hasOption( APP_NAME )) {
+ appName = line.getOptionValue( APP_NAME );
+ }
+ if (line.hasOption( ORG_NAME )) {
+ orgName = line.getOptionValue( ORG_NAME );
+ }
+ if (line.hasOption( NUM_USERS )) {
+ numUsers = Integer.parseInt( line.getOptionValue( NUM_USERS ) );
+ }
+ if (line.hasOption( NUM_COLLECTIONS )) {
+ numCollections = Integer.parseInt( line.getOptionValue( NUM_COLLECTIONS ) );
+ }
+ if (line.hasOption( NUM_ENTITIES )) {
+ numEntities = Integer.parseInt( line.getOptionValue( NUM_ENTITIES ) );
+ }
+ if (line.hasOption( ADMIN_USERNAME )) {
+ adminUsername = line.getOptionValue( ADMIN_USERNAME );
+ }
+ if (line.hasOption( ADMIN_PASSWORD )) {
+ adminPassword = line.getOptionValue( ADMIN_PASSWORD );
+ }
+
+ createTestData();
}
- public void createTestData() throws Exception {
+ @Override
+ @SuppressWarnings("static-access")
+ public Options createOptions() {
- String orgName = "testexportorg";
+ Options options = super.createOptions();
- //nothing to do
- if ( managementService.getOrganizationByName( orgName ) != null ) {
- return;
- }
+ Option appName = OptionBuilder.hasArg()
+ .withDescription( "Application name to use" ).create( APP_NAME );
- OrganizationOwnerInfo orgInfo = managementService
- .createOwnerAndOrganization( orgName, "textExportUser@apigee.com", "Test User",
- "textExportUser@apigee.com", "password", true, false );
+ Option orgName = OptionBuilder.hasArg()
+ .withDescription( "Organization to use (will create if not present)" ).create( ORG_NAME );
- UUID appId = managementService.createApplication( orgInfo.getOrganization().getUuid(), "application" ).getId();
+ Option numUsers = OptionBuilder.hasArg()
+ .withDescription( "Number of users create (in addition to users)" ).create( NUM_USERS );
- EntityManager em = emf.getEntityManager( appId );
+ Option numCollection = OptionBuilder.hasArg()
+ .withDescription( "Number of collections to create (in addition to users)" ).create( NUM_COLLECTIONS );
- User first = new User();
- first.setUsername( "first" );
- first.setEmail( "first@usergrid.com" );
+ Option numEntities = OptionBuilder.hasArg()
+ .withDescription( "Number of entities to create per collection" ).create( NUM_ENTITIES );
- Entity firstUserEntity = em.create( first );
+ Option adminUsername = OptionBuilder.hasArg()
+ .withDescription( "Admin Username" ).create( ADMIN_USERNAME );
- assertNotNull( firstUserEntity );
+ Option adminPassword = OptionBuilder.hasArg()
+ .withDescription( "Admin Password" ).create( ADMIN_PASSWORD );
- User second = new User();
- second.setUsername( "second" );
- second.setEmail( "second@usergrid.com" );
+ options.addOption( appName );
+ options.addOption( orgName );
+ options.addOption( numUsers );
+ options.addOption( numCollection );
+ options.addOption( numEntities );
+ options.addOption( adminUsername );
+ options.addOption( adminPassword );
- Entity secondUserEntity = em.create( second );
+ return options;
+ }
- assertNotNull( secondUserEntity );
- em.createConnection( firstUserEntity, "likes", secondUserEntity );
+ public void createTestData() throws Exception {
- em.createConnection( secondUserEntity, "dislikes", firstUserEntity );
+ OrganizationInfo orgInfo = managementService.getOrganizationByName( orgName );
- // now create some activities and put them into the user stream
+ if (orgInfo == null) {
+ OrganizationOwnerInfo ownerInfo = managementService.createOwnerAndOrganization(
+ orgName, adminUsername + "@example.com", adminUsername,
+ adminUsername + "@example.com", adminPassword, true, false );
+ orgInfo = ownerInfo.getOrganization();
+ }
- Activity activity = new Activity();
+ ApplicationInfo appInfo = managementService.getApplicationInfo( orgName + "/" + appName );
- Activity.ActivityObject actor = new Activity.ActivityObject();
- actor.setEntityType( "user" );
- actor.setId( firstUserEntity.getUuid().toString() );
+ if (appInfo == null) {
+ UUID appId = managementService.createApplication( orgInfo.getUuid(), appName ).getId();
+ appInfo = managementService.getApplicationInfo( appId );
+ }
- activity.setActor( actor );
- activity.setVerb( "POST" );
+ EntityManager em = emf.getEntityManager( appInfo.getId() );
+
+ Fairy fairy = Fairy.create();
+
+ List<Entity> users = new ArrayList<Entity>( numUsers );
+
+ for (int i = 0; i < numUsers; i++) {
+
+ final Person person = fairy.person();
+ Entity userEntity = null;
+ try {
+ final Map<String, Object> userMap = new HashMap<String, Object>() {{
+ put( "username", person.username() );
+ put( "password", person.password() );
+ put( "email", person.email() );
+ put( "companyEmail", person.companyEmail() );
+ put( "dateOfBirth", person.dateOfBirth() );
+ put( "firstName", person.firstName() );
+ put( "lastName", person.lastName() );
+ put( "nationalIdentificationNumber", person.nationalIdentificationNumber() );
+ put( "telephoneNumber", person.telephoneNumber() );
+ put( "passportNumber", person.passportNumber() );
+ put( "address", person.getAddress() );
+ }};
+
+ userEntity = em.create( "user", userMap );
+ users.add( userEntity );
+
+ } catch (DuplicateUniquePropertyExistsException e) {
+ logger.error( "Dup user generated: " + person.username() );
+ continue;
+ } catch (Exception e) {
+ logger.error("Error creating user", e);
+ continue;
+ }
+
+ final Company company = person.getCompany();
+ try {
+ EntityRef ref = em.getAlias( "company", company.name() );
+ Entity companyEntity = (ref == null) ? null : em.get( ref );
+
+ // create company if it does not exist yet
+ if ( companyEntity == null ) {
+ final Map<String, Object> companyMap = new HashMap<String, Object>() {{
+ put( "name", company.name() );
+ put( "domain", company.domain() );
+ put( "email", company.email() );
+ put( "url", company.url() );
+ put( "vatIdentificationNumber", company.vatIdentificationNumber() );
+ }};
+ companyEntity = em.create( "company", companyMap );
+ } else {
+ logger.info("Company {} already exists", company.name());
+ }
+
+ em.createConnection( userEntity, "employer", companyEntity );
+
+ } catch (DuplicateUniquePropertyExistsException e) {
+ logger.error( "Dup company generated {} property={}", company.name(), e.getPropertyName() );
+ continue;
+ } catch (Exception e) {
+ logger.error("Error creating or connecting company", e);
+ continue;
+ }
+
+ try {
+ for (int j = 0; j < 5; j++) {
+ Activity activity = new Activity();
+ Activity.ActivityObject actor = new Activity.ActivityObject();
+ actor.setEntityType( "user" );
+ actor.setId( userEntity.getUuid().toString() );
+ activity.setActor( actor );
+ activity.setVerb( "POST" );
+ activity.setContent( "User " + person.username() + " generated a random string "
+ + RandomStringUtils.randomAlphanumeric( 5 ) );
+ em.createItemInCollection( userEntity, "activities", "activity", activity.getProperties() );
+ }
+
+ if (users.size() > 10) {
+ for (int j = 0; j < 5; j++) {
+ try {
+ em.createConnection( userEntity, "associate", users.get( (int) (Math.random() * users.size()) ) );
+ } catch (Exception e) {
+ logger.error( "Error connecting user to user: " + e.getMessage() );
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ logger.error("Error creating activities", e);
+ continue;
+ }
- em.createItemInCollection( firstUserEntity, "activities", "activity", activity.getProperties() );
+ }
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
index 63187a4..3c427e1 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
@@ -116,7 +116,7 @@ public abstract class ToolBase {
public void printCliHelp( String message ) {
System.out.println( message );
HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp( "java -jar usergrid-tools-0.0.1-SNAPSHOT.jar " + getToolName(), createOptions() );
+ formatter.printHelp( "java -jar usergrid-tools-1.0.2.jar " + getToolName(), createOptions() );
System.exit( -1 );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index a1e3f6b..446aa91 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -63,65 +63,23 @@ public class ExportAppTest {
// create app with some data
- OrganizationOwnerInfo orgInfo = setup.getMgmtSvc().createOwnerAndOrganization(
- "org_" + rand, "user_" + rand, rand.toUpperCase(), rand + "@example.com", rand );
-
- ApplicationInfo appInfo = setup.getMgmtSvc().createApplication(
- orgInfo.getOrganization().getUuid(), "app_" + rand );
-
- final EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
+ String orgName = "org_" + rand;
+ String appName = "app_" + rand;
+
+ ExportDataCreator creator = new ExportDataCreator();
+ creator.startTool( new String[] {
+ "-organization", orgName,
+ "-application", appName,
+ "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort()
+ }, false);
- // create connected things
-
- final List<Entity> connectedThings = new ArrayList<Entity>();
- String connectedType = "connected_thing";
- em.createApplicationCollection(connectedType);
- for ( int j=0; j<NUM_CONNECTIONS; j++) {
- final String name = "connected_thing_" + j;
- connectedThings.add( em.create( connectedType, new HashMap<String, Object>() {{
- put( "name", name );
- }} ) );
- }
-
- // create collections of things, every other thing is connected to the connected things
-
- final AtomicInteger entitiesCount = new AtomicInteger(0);
- final AtomicInteger connectionCount = new AtomicInteger(0);
-
- ExecutorService execService = Executors.newFixedThreadPool( 50);
- final Scheduler scheduler = Schedulers.from( execService );
-
- for (int i = 0; i < NUM_COLLECTIONS; i++) {
-
- final String type = "thing_" + i;
- em.createApplicationCollection( type );
- connectionCount.getAndIncrement();
-
- for (int j = 0; j < NUM_ENTITIES; j++) {
- final String name = "thing_" + j;
- final Entity source = em.create(
- type, new HashMap<String, Object>() {{
- put( "name", name );
- }} );
- entitiesCount.getAndIncrement();
-
- for (Entity target : connectedThings) {
- em.createConnection( source, "has", target );
- connectionCount.getAndIncrement();
- }
- }
- }
-
- logger.info( "Done. Created {} entities and {} connections", entitiesCount.get(), connectionCount.get() );
-
long start = System.currentTimeMillis();
String directoryName = "target/export" + rand;
ExportApp exportApp = new ExportApp();
exportApp.startTool( new String[]{
- "-application", appInfo.getName(),
- "-readThreads", "100",
+ "-application", orgName + "/" + appName,
"-writeThreads", "100",
"-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
"-outputDir", directoryName
@@ -137,8 +95,7 @@ public class ExportAppTest {
File exportDir1 = new File(directoryName + "1");
exportApp.startTool( new String[]{
- "-application", appInfo.getName(),
- "-readThreads", "1",
+ "-application", orgName + "/" + appName,
"-writeThreads", "1",
"-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
"-outputDir", directoryName + "1"
[18/18] incubator-usergrid git commit: Merge branch 'two-dot-o-dev'
of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into
two-dot-o-dev
Posted by sn...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-dev
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/89dd0ad9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/89dd0ad9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/89dd0ad9
Branch: refs/heads/two-dot-o-dev
Commit: 89dd0ad9868acda49da57fc24748f0cc4f89055b
Parents: a562366 b1393b4
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Jul 22 11:18:29 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Jul 22 11:18:29 2015 -0400
----------------------------------------------------------------------
.../usergrid/services/AbstractService.java | 64 ++++++++++----------
1 file changed, 31 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
[07/18] incubator-usergrid git commit: Make flatmap max observables
match write thread count and use Schedulers.io() instead of a custom
readScheduler.
Posted by sn...@apache.org.
Make flatmap max observables match write thread count and use Schedulers.io() instead of a custom readScheduler.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b58390d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b58390d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b58390d9
Branch: refs/heads/two-dot-o-dev
Commit: b58390d96eb423287247774533b18e9c4ee43843
Parents: dab84e9
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Jul 15 13:53:55 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Jul 15 13:53:55 2015 -0400
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ExportApp.java | 39 +++++---------------
.../apache/usergrid/tools/ExportAppTest.java | 12 +++---
2 files changed, 14 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b58390d9/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index c302a74..b2da0ea 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -54,7 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*
* Will create as many output files as there are writeThreads (by default: 10).
*
- * Will create two types of files: *.uge for Usegrird entities and *.ugc for entity to entity connections.
+ * Will create two types of files: *.entities for Usegrird entities and *.collections for entity to entity connections.
*
* Every line of the data files is a complete JSON object.
*/
@@ -62,7 +62,6 @@ public class ExportApp extends ExportingToolBase {
static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
static final String APPLICATION_NAME = "application";
- private static final String READ_THREAD_COUNT = "readThreads";
private static final String WRITE_THREAD_COUNT = "writeThreads";
String applicationName;
@@ -71,16 +70,13 @@ public class ExportApp extends ExportingToolBase {
AtomicInteger entitiesWritten = new AtomicInteger(0);
AtomicInteger connectionsWritten = new AtomicInteger(0);
- Scheduler readScheduler;
Scheduler writeScheduler;
ObjectMapper mapper = new ObjectMapper();
Map<Thread, JsonGenerator> entityGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
- // set via CLI
- int readThreadCount = 80;
- int writeThreadCount = 10; // limiting write will limit output files
+ int writeThreadCount = 10; // set via CLI option; limiting write will limit output files
@Override
@@ -93,10 +89,6 @@ public class ExportApp extends ExportingToolBase {
.withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
options.addOption( appNameOption );
- Option readThreadsOption = OptionBuilder.hasArg().withType(0)
- .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
- options.addOption( readThreadsOption );
-
Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
.withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
options.addOption( writeThreadsOption );
@@ -113,15 +105,6 @@ public class ExportApp extends ExportingToolBase {
applicationName = line.getOptionValue( APPLICATION_NAME );
- if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
- try {
- readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
- } catch (NumberFormatException nfe) {
- logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
- return;
- }
- }
-
if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
try {
writeThreadCount = Integer.parseInt( line.getOptionValue( WRITE_THREAD_COUNT ) );
@@ -144,9 +127,6 @@ public class ExportApp extends ExportingToolBase {
final EntityManager em = emf.getEntityManager( applicationId );
organizationName = em.getApplication().getOrganizationName();
- ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
- readScheduler = Schedulers.from( readThreadPoolExecutor );
-
ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
writeScheduler = Schedulers.from( writeThreadPoolExecutor );
@@ -155,19 +135,18 @@ public class ExportApp extends ExportingToolBase {
collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
public Observable<ExportEntity> call(String collection) {
- return Observable.create( new EntityObservable( em, collection ))
+ return Observable.create( new EntityObservable( em, collection ) )
.doOnNext( new EntityWriteAction() ).subscribeOn( writeScheduler );
}
-
- }, 10).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
-
+
+ }, writeThreadCount ).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
+
public Observable<ExportConnection> call(ExportEntity exportEntity) {
- return Observable.create( new ConnectionsObservable( em, exportEntity ))
+ return Observable.create( new ConnectionsObservable( em, exportEntity ) )
.doOnNext( new ConnectionWriteAction() ).subscribeOn( writeScheduler );
}
-
- }, 10)
- .subscribeOn( readScheduler )
+
+ }, writeThreadCount)
.doOnCompleted( new FileWrapUpAction() )
.toBlocking().last();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b58390d9/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index c5411fd..a1e3f6b 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -24,7 +24,6 @@ import org.apache.usergrid.management.ApplicationInfo;
import org.apache.usergrid.management.OrganizationOwnerInfo;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
-import org.junit.Assert;
import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -132,9 +131,9 @@ public class ExportAppTest {
File exportDir = new File(directoryName);
assertTrue( getFileCount( exportDir, "entities" ) > 0 );
- assertTrue( getFileCount( exportDir, "collections" ) > 0 );
- assertTrue( getFileCount( exportDir, "entities" ) >= 100 );
- assertTrue( getFileCount( exportDir, "collections" ) >= 100 );
+ assertTrue( getFileCount( exportDir, "connections" ) > 0 );
+ assertTrue( getFileCount( exportDir, "entities" ) <= 100 );
+ assertTrue( getFileCount( exportDir, "connections" ) <= 100 );
File exportDir1 = new File(directoryName + "1");
exportApp.startTool( new String[]{
@@ -147,9 +146,8 @@ public class ExportAppTest {
logger.info( "1 thread time = " + (System.currentTimeMillis() - start) / 1000 + "s" );
- exportDir = new File(directoryName);
- assertEquals( 1, getFileCount( exportDir, "entities" ));
- assertEquals( 1, getFileCount( exportDir, "collections" ));
+ assertEquals( 1, getFileCount( exportDir1, "entities" ));
+ assertEquals( 1, getFileCount( exportDir1, "connections" ));
}
private static int getFileCount(File exportDir, final String ext ) {
[10/18] incubator-usergrid git commit: More consistency in logging
about admin users, using username : email : uuid format.
Posted by sn...@apache.org.
More consistency in logging about admin users, using username : email : uuid format.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/4f8aa2f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/4f8aa2f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/4f8aa2f3
Branch: refs/heads/two-dot-o-dev
Commit: 4f8aa2f36fca8c661d627f1748e3994e62107ce4
Parents: 125ffe9
Author: Dave Johnson <sn...@apache.org>
Authored: Fri Jul 17 17:39:25 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Fri Jul 17 17:39:25 2015 -0400
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ImportAdmins.java | 29 ++++++++++++++------
1 file changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4f8aa2f3/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
index 7e08a4c..f39ef9b 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
@@ -462,7 +462,13 @@ public class ImportAdmins extends ToolBase {
} else { // org exists, add original user to it
try {
managementService.addAdminUserToOrganization( userInfo, orgInfo, false );
- logger.debug( "Added user {} to org {}", new Object[]{user.getEmail(), orgName} );
+ logger.debug( "Added to org user {}:{}:{}",
+ new Object[]{
+ orgInfo.getName(),
+ user.getUsername(),
+ user.getEmail(),
+ user.getUuid()
+ });
} catch (Exception e) {
logger.error( "Error Adding user {} to org {}", new Object[]{user.getEmail(), orgName} );
@@ -533,7 +539,9 @@ public class ImportAdmins extends ToolBase {
logger.debug( "Created new org {} for user {}:{}:{} from duplicate user {}:{}",
new Object[]{
orgInfo.getName(),
- originalUser.getUuid(), originalUser.getUsername(), originalUser.getEmail(),
+ originalUser.getUsername(),
+ originalUser.getEmail(),
+ originalUser.getUuid(),
dup.username, dup.email
});
@@ -546,7 +554,9 @@ public class ImportAdmins extends ToolBase {
logger.debug( "Added to org user {}:{}:{} from duplicate user {}:{}",
new Object[]{
orgInfo.getName(),
- originalUser.getUuid(), originalUser.getUsername(), originalUser.getEmail(),
+ originalUser.getUsername(),
+ originalUser.getEmail(),
+ originalUser.getUuid(),
dup.username, dup.email
});
@@ -686,7 +696,7 @@ public class ImportAdmins extends ToolBase {
while (!done) {
try {
- ImportMetadataTask task = this.workQueue.poll(30, TimeUnit.SECONDS);
+ ImportMetadataTask task = this.workQueue.poll( 30, TimeUnit.SECONDS );
if (task == null) {
logger.warn("Reading from metadata queue was null!");
@@ -694,11 +704,11 @@ public class ImportAdmins extends ToolBase {
Thread.sleep(1000);
continue;
}
- metadataEmptyCount.set(0);
+ metadataEmptyCount.set( 0 );
long startTime = System.currentTimeMillis();
- importEntityMetadata(em, task.entityRef, task.metadata);
+ importEntityMetadata( em, task.entityRef, task.metadata );
long stopTime = System.currentTimeMillis();
long duration = stopTime - startTime;
@@ -770,7 +780,7 @@ public class ImportAdmins extends ToolBase {
em.create(uuid, type, entityProps);
logger.debug( "Imported admin user {}:{}:{}",
- new Object[] { uuid, entityProps.get( "username" ), entityProps.get("email") } );
+ new Object[] { entityProps.get( "username" ), entityProps.get("email"), uuid } );
userCount.getAndIncrement();
auditQueue.put(entityProps);
@@ -805,7 +815,10 @@ public class ImportAdmins extends ToolBase {
private void handleDuplicateAccount(EntityManager em, String dupProperty, Map<String, Object> entityProps ) {
logger.info( "Processing duplicate user {}:{}:{} with duplicate {}", new Object[]{
- entityProps.get( "uuid" ), entityProps.get( "username" ), entityProps.get( "email" ), dupProperty} );
+ entityProps.get( "username" ),
+ entityProps.get( "email" ),
+ entityProps.get( "uuid" ),
+ dupProperty} );
UUID dupUuid = UUID.fromString( entityProps.get("uuid").toString() );
try {
[14/18] incubator-usergrid git commit: merge
Posted by sn...@apache.org.
merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9b9f55ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9b9f55ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9b9f55ac
Branch: refs/heads/two-dot-o-dev
Commit: 9b9f55ac20f669451dadc7cdeeb4643ac0614ad4
Parents: a6e68a0 897fd50
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Jul 21 14:09:05 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Jul 21 14:09:05 2015 -0600
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ExportAdmins.java | 118 ++++++----
.../org/apache/usergrid/tools/ImportAdmins.java | 226 ++++++++++++++-----
.../usergrid/tools/ExportImportAdminsTest.java | 71 ++++--
...adata.usergrid-management.1433331614293.json | 52 +++++
...users.usergrid-management.1433331614293.json | 12 +
5 files changed, 358 insertions(+), 121 deletions(-)
----------------------------------------------------------------------