You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/02/03 16:32:47 UTC
[1/3] incubator-usergrid git commit: Using two-pass approach to
ensure Entities are created before Connections.
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-import 63088a2fa -> c652c39bc
Using two-pass approach to ensure Entities are created before Connections.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/858cbc0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/858cbc0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/858cbc0e
Branch: refs/heads/two-dot-o-import
Commit: 858cbc0e8003fbbb79763e0602a8efc9a6b93783
Parents: 41852d6
Author: Dave Johnson <dm...@apigee.com>
Authored: Fri Jan 30 08:10:04 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Fri Jan 30 08:10:04 2015 -0500
----------------------------------------------------------------------
.../usergrid/management/export/ExportJob.java | 2 +-
.../usergrid/management/importer/ImportJob.java | 6 +-
.../management/importer/ImportServiceImpl.java | 91 +++++++++++++++-----
.../management/importer/ImportServiceIT.java | 19 ++--
4 files changed, 79 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/858cbc0e/stack/services/src/main/java/org/apache/usergrid/management/export/ExportJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/export/ExportJob.java b/stack/services/src/main/java/org/apache/usergrid/management/export/ExportJob.java
index 8e845b4..3bdfac9 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/export/ExportJob.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/export/ExportJob.java
@@ -46,7 +46,7 @@ public class ExportJob extends OnlyOnceJob {
@Override
public void doJob( JobExecution jobExecution ) throws Exception {
- logger.info( "execute ExportJob {}", jobExecution );
+ logger.info( "execute ExportJob {}", jobExecution.getJobId().toString() );
JobData jobData = jobExecution.getJobData();
if ( jobData == null ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/858cbc0e/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java
index 9ebb65e..24393cd 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java
@@ -42,13 +42,13 @@ public class ImportJob extends OnlyOnceJob {
@Autowired
ImportService importService;
- public ImportJob() {
+ public ImportJob(){
logger.info( "ImportJob created " + this );
}
@Override
protected void doJob(JobExecution jobExecution) throws Exception {
- logger.info( "execute ImportJob {}", jobExecution );
+ logger.info( "execute ImportJob {}", jobExecution.getJobId().toString() );
JobData jobData = jobExecution.getJobData();
if ( jobData == null ) {
@@ -76,7 +76,7 @@ public class ImportJob extends OnlyOnceJob {
this.importService = importService;
}
-
+
/**
* This method is called when the job is retried maximum times by the
* scheduler but still fails. Thus the scheduler marks it as DEAD.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/858cbc0e/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 128f69e..e78675b 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -861,9 +861,16 @@ public class ImportServiceImpl implements ImportService {
if ( entityRef.getType() == null ) {
entityRef = em.get(ownerEntityRef.getUuid());
}
+
+ logger.debug("Creating connection from {}:{} to {}:{}",
+ new Object[] {
+ ownerEntityRef.getType(), ownerEntityRef.getUuid(),
+ entityRef.getType(), entityRef.getUuid() });
+
em.createConnection(ownerEntityRef, connectionType, entityRef);
} catch (Exception e) {
+ logger.error("Error writing connection", e);
fileImport.setErrorMessage(e.getMessage());
try {
rootEm.update(fileImport);
@@ -895,11 +902,19 @@ public class ImportServiceImpl implements ImportService {
public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
EntityManager rootEm = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
try {
+
+ logger.debug("Adding map to {}:{} dictionary {}",
+ new Object[] {ownerEntityRef.getType(), ownerEntityRef.getType(), dictionaryName });
+
em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
+
} catch (Exception e) {
+ logger.error("Error writing dictionary", e);
fileImport.setErrorMessage(e.getMessage());
try {
+
rootEm.update(fileImport);
+
} catch (Exception ex) {
// TODO should we abort at this point?
@@ -925,19 +940,36 @@ public class ImportServiceImpl implements ImportService {
this.fileImport = fileImport;
}
+
@Override
public void call(final Subscriber<? super WriteEvent> subscriber) {
- WriteEvent entityWrapper = null;
- EntityRef ownerEntityRef = null;
- String entityUuid = "";
- String entityType = "";
+ // have to do this in two passes so that entities are created before connections
+
+ // first entities
+ process( subscriber, true );
+
+ // next connections and dictionaries
+ process( subscriber, false);
+ }
+
+
+ private void process(final Subscriber<? super WriteEvent> subscriber, boolean entities ) {
+
+ logger.debug("process(): entities = " + entities );
+
+ EntityRef lastEntitySeenRef = null;
+
try {
+
while (!subscriber.isUnsubscribed() && jp.nextToken() != JsonToken.END_OBJECT) {
+
String collectionName = jp.getCurrentName();
+ logger.debug("Processing currentName: " + jp.getCurrentName());
+
// create the wrapper for connections
- if ( collectionName != null && collectionName.equals("connections")) {
+ if (collectionName != null && collectionName.equals("connections")) {
jp.nextToken(); // START_OBJECT
while (jp.nextToken() != JsonToken.END_OBJECT) {
@@ -947,49 +979,59 @@ public class ImportServiceImpl implements ImportService {
while (jp.nextToken() != JsonToken.END_ARRAY) {
String entryId = jp.getText();
- EntityRef entryRef = new SimpleEntityRef(UUID.fromString(entryId));
- entityWrapper = new ConnectionEvent(ownerEntityRef, connectionType, entryRef);
-
- // Creates a new subscriber to the observer with the given connection wrapper
- subscriber.onNext(entityWrapper);
+ if ( !entities ) {
+ EntityRef entryRef = new SimpleEntityRef(UUID.fromString(entryId));
+ WriteEvent entityWrapper = new ConnectionEvent(lastEntitySeenRef, connectionType, entryRef);
+ subscriber.onNext(entityWrapper);
+ }
}
}
}
// create the wrapper for dictionaries
- else if ( collectionName != null && collectionName.equals("dictionaries")) {
+ else if (collectionName != null && collectionName.equals("dictionaries")) {
jp.nextToken(); // START_OBJECT
while (jp.nextToken() != JsonToken.END_OBJECT) {
String dictionaryName = jp.getCurrentName();
-
jp.nextToken();
-
Map<String, Object> dictionary = jp.readValueAs(HashMap.class);
- entityWrapper = new DictionaryEvent(ownerEntityRef, dictionaryName, dictionary);
- // Creates a new subscriber to the observer with the given dictionary wrapper
- subscriber.onNext(entityWrapper);
+ if ( !entities ) {
+ WriteEvent entityWrapper = new DictionaryEvent(
+ lastEntitySeenRef, dictionaryName, dictionary);
+ subscriber.onNext(entityWrapper);
+ }
}
subscriber.onCompleted();
} else {
// Regular collections
+
jp.nextToken(); // START_OBJECT
- Map<String, Object> properties = new HashMap<String, Object>();
JsonToken token = jp.nextToken();
+ Map<String, Object> properties = new HashMap<>();
+
+ String entityUuid = null;
+ String entityType = null;
+
while (token != JsonToken.END_OBJECT) {
+
if (token == JsonToken.VALUE_STRING || token == JsonToken.VALUE_NUMBER_INT) {
+
String key = jp.getCurrentName();
+ logger.debug(" currentName: " + jp.getText());
+
if (key.equals("uuid")) {
entityUuid = jp.getText();
} else if (key.equals("type")) {
entityType = jp.getText();
+
} else if (key.length() != 0 && jp.getText().length() != 0) {
String value = jp.getText();
properties.put(key, value);
@@ -998,24 +1040,29 @@ public class ImportServiceImpl implements ImportService {
token = jp.nextToken();
}
- ownerEntityRef = new SimpleEntityRef(entityType, UUID.fromString(entityUuid));
- entityWrapper = new EntityEvent(UUID.fromString(entityUuid), entityType, properties);
-
- // Creates a new subscriber to the observer with the given dictionary wrapper
- subscriber.onNext(entityWrapper);
+ if ( entities ) {
+ WriteEvent entityWrapper = new EntityEvent(
+ UUID.fromString(entityUuid), entityType, properties);
+ subscriber.onNext(entityWrapper);
+ }
+ // don't save it, but do keep track of last entity seen
+ lastEntitySeenRef = new SimpleEntityRef( entityType, UUID.fromString(entityUuid) );
}
}
+
} catch (Exception e) {
// skip illegal entity UUID and go to next one
fileImport.setErrorMessage(e.getMessage());
try {
rootEm.update(fileImport);
} catch (Exception ex) {
+ logger.error("Error updating file import record", ex);
}
subscriber.onError(e);
}
}
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/858cbc0e/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
index 4a028b9..178909b 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
@@ -202,21 +202,14 @@ public class ImportServiceIT {
assertTrue( !importedThings.isEmpty() );
- // first two things have connections
- for (int i = 0; i < 2; i++) {
- Results r = em2.getConnectedEntities(
- importedThings.get(i), "related", null, Level.IDS);
+ // two things have connections
+ int conCount = 0;
+ for ( Entity e : importedThings ) {
+ Results r = em2.getConnectedEntities( e, "related", null, Level.IDS);
List<ConnectionRef> connections = r.getConnections();
- assertTrue( !connections.isEmpty() );
- }
-
- // other things do not have connections
- for (int i = 3; i < 10; i++) {
- Results r = em2.getConnectedEntities(
- importedThings.get(i), "related", null, Level.IDS);
- List<ConnectionRef> connections = r.getConnections();
- assertTrue( connections.isEmpty() );
+ conCount += connections.size();
}
+ assertEquals( 2, conCount );
logger.debug("\n\nCheck dictionary\n");
[3/3] incubator-usergrid git commit: Merge branch 'two-dot-o-import'
of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into
two-dot-o-import
Posted by sn...@apache.org.
Merge branch 'two-dot-o-import' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-import
Conflicts:
stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c652c39b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c652c39b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c652c39b
Branch: refs/heads/two-dot-o-import
Commit: c652c39bc7a87db15dd68349b7709df60cc8566d
Parents: b22c37a 63088a2
Author: Dave Johnson <dm...@apigee.com>
Authored: Tue Feb 3 10:32:32 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Tue Feb 3 10:32:32 2015 -0500
----------------------------------------------------------------------
.../queue/impl/QueueScopeFactoryImpl.java | 1 +
.../queue/impl/SQSQueueManagerImpl.java | 51 --------
.../queue/impl/UsergridAwsCredentials.java | 59 +++++++++
.../impl/UsergridAwsCredentialsProvider.java | 59 +++++++++
.../organizations/OrganizationResource.java | 26 ++--
.../applications/ApplicationResource.java | 48 ++++---
.../rest/management/ExportResourceIT.java | 11 +-
.../rest/management/ImportResourceIT.java | 14 +-
.../management/export/ExportServiceImpl.java | 2 +-
.../management/export/S3ExportImpl.java | 5 +-
.../management/importer/FileImportJob.java | 4 +-
.../management/importer/ImportService.java | 19 +--
.../management/importer/ImportServiceImpl.java | 128 +++++++++++++------
.../management/importer/S3ImportImpl.java | 6 +-
.../notifications/NotificationsService.java | 2 +-
.../services/queues/ImportQueueListener.java | 26 ++--
.../services/queues/ImportQueueManager.java | 65 ++++++++++
.../services/queues/ImportQueueMessage.java | 42 +++++-
.../usergrid/services/queues/QueueListener.java | 4 +-
.../resources/usergrid-services-context.xml | 23 +++-
.../management/export/ExportServiceIT.java | 23 ++--
.../management/importer/ImportServiceIT.java | 48 +++++--
.../src/test/resources/log4j.properties | 1 +
23 files changed, 466 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c652c39b/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
index 6f2eddd,6f2eddd..0b1fac5
--- a/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
@@@ -17,6 -17,6 +17,7 @@@
package org.apache.usergrid.management.export;
++import com.amazonaws.SDKGlobalConfiguration;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Module;
@@@ -50,8 -50,8 +51,8 @@@ public class S3ExportImpl implements S3
Map<String, Object> storage_info = (Map<String,Object>)properties.get( "storage_info" );
String bucketName = ( String ) storage_info.get( "bucket_location" );
-- String accessId = ( String ) storage_info.get( "s3_access_id" );
-- String secretKey = ( String ) storage_info.get( "s3_key" );
++ String accessId = ( String ) storage_info.get( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR );
++ String secretKey = ( String ) storage_info.get( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR );
Properties overrides = new Properties();
overrides.setProperty( "s3" + ".identity", accessId );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c652c39b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
index cb83ce8,edeed1c..089c8e5
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
@@@ -54,33 -54,19 +54,33 @@@ public class FileImportJob extends Only
@Override
protected void doJob(JobExecution jobExecution) throws Exception {
- logger.info( "execute FileImportJob {}", jobExecution.toString() );
+ logger.info("execute FileImportJob {}", jobExecution.toString());
- JobData jobData = jobExecution.getJobData();
- if ( jobData == null ) {
- logger.error( "jobData cannot be null" );
- return;
- }
+ try {
+ JobData jobData = jobExecution.getJobData();
+ if (jobData == null) {
+ logger.error("jobData cannot be null");
+ return;
+ }
+
+ // heartbeat to indicate job has started
+ jobExecution.heartbeat();
- // heartbeat to indicate job has started
- jobExecution.heartbeat();
+ // call the File Parser for the file set in job execution
+ importService.parseFileToEntities(jobExecution);
-
+
- // call the File Parser for the file set in job execution
- //importService.parseFileToEntities(jobExecution);
+ } catch ( Throwable t ) {
+ logger.debug("Error importing file", t);
+
+ // update file import record
+ UUID fileImportId = (UUID) jobExecution.getJobData().getProperty(FILE_IMPORT_ID);
+ EntityManager em = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
+ FileImport fileImport = em.get(fileImportId, FileImport.class);
+ fileImport.setState( FileImport.State.FAILED );
+ em.update( fileImport );
+
+ throw t;
+ }
logger.error("File Import Service completed job");
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c652c39b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java
index bc90f49,26fb31f..2275888
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java
@@@ -46,17 -47,17 +47,18 @@@ public interface ImportService
/**
* Parses the input file and creates entities
-- *
-- * @param jobExecution
-- * @throws Exception
+ */
+ void parseFileToEntities(ImportQueueMessage importQueueMessage) throws Exception;
+
+ /**
++ * Parses the input file and creates entities
+ */
+ void parseFileToEntities(JobExecution jobExecution) throws Exception;
+
+ /**
* Get the state for the Job with UUID
* @param uuid Job UUID
* @return State of Job
-- * @throws Exception
*/
String getState(UUID uuid) throws Exception;
@@@ -64,21 -65,21 +66,22 @@@
* Returns error message for the job with UUID
* @param uuid Job UUID
* @return error message
-- * @throws Exception
*/
String getErrorMessage(UUID uuid) throws Exception;
/**
-- * @param jobExecution
* @return FileImportEntity
-- * @throws Exception
+ */
+ FileImport getFileImportEntity(final ImportQueueMessage importQueueMessage) throws Exception;
+
+ /**
++ * @return FileImportEntity
+ */
+ FileImport getFileImportEntity(final JobExecution jobExecution) throws Exception;
+
+ /**
* @param jobExecution
* @return ImportEntity
-- * @throws Exception
*/
Import getImportEntity(final JobExecution jobExecution) throws Exception;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c652c39b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 7339087,b8431fa..1357ccb
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@@ -27,6 -28,9 +28,7 @@@ import org.apache.usergrid.persistence.
import org.apache.usergrid.persistence.entities.FileImport;
import org.apache.usergrid.persistence.entities.Import;
import org.apache.usergrid.persistence.entities.JobData;
+
-import org.aspectj.lang.annotation.Before;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.JsonParser;
@@@ -44,7 -48,18 +46,17 @@@ import java.io.File
import java.io.IOException;
import java.util.*;
+ import javax.annotation.PostConstruct;
+
+
import org.apache.usergrid.persistence.index.query.Query.Level;
-import org.apache.usergrid.persistence.index.utils.UUIDUtils;
+ import org.apache.usergrid.persistence.queue.QueueManager;
+ import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+ import org.apache.usergrid.persistence.queue.QueueScope;
+ import org.apache.usergrid.persistence.queue.QueueScopeFactory;
+ import org.apache.usergrid.services.ServiceManagerFactory;
+ import org.apache.usergrid.services.queues.ImportQueueListener;
+ import org.apache.usergrid.services.queues.ImportQueueMessage;
public class ImportServiceImpl implements ImportService {
@@@ -62,16 -77,39 +74,38 @@@
//dependency injection
private SchedulerService sch;
+ private ServiceManagerFactory smf;
+
+ //Dependency injection through spring
+ private QueueManager qm;
+
+ private QueueManagerFactory queueManagerFactory;
+
-
//inject Management Service to access Organization Data
private ManagementService managementService;
private JsonFactory jsonFactory = new JsonFactory();
++
+ @PostConstruct
+ public void init(){
++
+ //TODO: move this to a before or initialization method.
+
+ //TODO: made queueName clearly defined.
+ //smf = getApplicationContext().getBean(ServiceManagerFactory.class);
+
+ String name = ImportQueueListener.QUEUE_NAME;
+ QueueScopeFactory queueScopeFactory = CpSetup.getInjector().getInstance(QueueScopeFactory.class);
+ QueueScope queueScope = queueScopeFactory.getScope(CpNamingUtils.MANAGEMENT_APPLICATION_ID, name);
+ queueManagerFactory = CpSetup.getInjector().getInstance(QueueManagerFactory.class);
+ qm = queueManagerFactory.getQueueManager(queueScope);
-
+ }
+
/**
* This schedules the main import Job
*
* @param config configuration of the job to be scheduled
* @return it returns the UUID of the scheduled job
-- * @throws Exception
*/
@Override
public UUID schedule(Map<String, Object> config) throws Exception {
@@@ -258,27 -302,19 +299,32 @@@
/**
* Returns the File Import Entity that stores all meta-data for the particular sub File import Job
- *
- * @param jobExecution the file import job details
* @return File Import Entity
-- * @throws Exception
+ */
+ @Override
+ public FileImport getFileImportEntity(final ImportQueueMessage queueMessage) throws Exception {
+
+ EntityManager em = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
+
+ return em.get(queueMessage.getFileId(), FileImport.class);
+ }
+
+
++ /**
++ * Returns the File Import Entity that stores all meta-data for the particular sub File import Job
++ * @return File Import Entity
+ */
+ @Override
+ public FileImport getFileImportEntity(final JobExecution jobExecution) throws Exception {
+
+ UUID fileImportId = (UUID) jobExecution.getJobData().getProperty(FILE_IMPORT_ID);
++
+ EntityManager em = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
+
+ return em.get(fileImportId, FileImport.class);
+ }
+
- /**
- * This returns the temporary files downloaded form s3
- */
- // @Override
- // public ArrayList<File> getEphemeralFile() {
- // return files;
- // }
+
public SchedulerService getSch() {
return sch;
}
@@@ -557,21 -591,21 +603,33 @@@
}
-- /**
-- * The loops through each temp file and parses it to store the entities from the json back into usergrid
-- *
-- * @throws Exception
-- */
@Override
- public void parseFileToEntities(JobExecution jobExecution) throws Exception {
++ // TODO: ImportService should not have to know about ImportQueueMessage
+ public void parseFileToEntities(ImportQueueMessage queueMessage) throws Exception {
- logger.debug("parseFileToEntities() for job {} status {}",
- jobExecution.getJobName(), jobExecution.getStatus().toString());
- logger.debug("parseFileToEntities() for job {} ",
- queueMessage.getFileName());
-
- // add properties to the import entity
+ FileImport fileImport = getFileImportEntity(queueMessage);
-
+ File file = new File(queueMessage.getFileName());
++ UUID targetAppId = queueMessage.getApplicationId();
++
++ parseFileToEntities( fileImport, file, targetAppId );
++ }
+
- // add properties to the import entity
- FileImport fileImport = getFileImportEntity(jobExecution);
+
++ @Override
++ // TODO: ImportService should not have to know about JobExecution
++ public void parseFileToEntities(JobExecution jobExecution) throws Exception {
++
++ FileImport fileImport = getFileImportEntity(jobExecution);
+ File file = new File(jobExecution.getJobData().getProperty("File").toString());
++ UUID targetAppId = (UUID) jobExecution.getJobData().getProperty("applicationId");
++
++ parseFileToEntities( fileImport, file, targetAppId );
++ }
++
++
++ public void parseFileToEntities( FileImport fileImport, File file, UUID targetAppId ) throws Exception {
++
++ logger.debug("parseFileToEntities() for file {} ", file.getAbsolutePath());
EntityManager emManagementApp = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
emManagementApp.update(fileImport);
@@@ -588,46 -622,44 +646,43 @@@
fileImport.setState(FileImport.State.STARTED);
emManagementApp.update(fileImport);
-- // Get target application ID from the job data (NOT from the filename)
- UUID targetAppId = (UUID) jobExecution.getJobData().getProperty("applicationId");
- UUID targetAppId = queueMessage.getApplicationId();
--
- if ( emManagementApp.get( targetAppId ) == null ) {
+ if (emManagementApp.get(targetAppId) == null) {
throw new IllegalArgumentException("Application does not exist: " + targetAppId.toString());
}
- EntityManager targetEm = emf.getEntityManager( targetAppId );
+ EntityManager targetEm = emf.getEntityManager(targetAppId);
logger.debug(" importing into app {} file {}", targetAppId.toString(), file.getAbsolutePath());
- importEntitiesFromFile(file, targetEm, emManagementApp, fileImport, jobExecution);
- JsonParser jp = getJsonParserForFile(file);
-
- // in case of resume, retrieve the last updated UUID for this file
- String lastUpdatedUUID = fileImport.getLastUpdatedUUID();
-
- // this handles partially completed files by updating entities from the point of failure
- if (!lastUpdatedUUID.equals(" ")) {
-
- // go till the last updated entity
- while (!jp.getText().equals(lastUpdatedUUID)) {
- jp.nextToken();
- }
-
- // skip the last one and start from the next one
- while (!(jp.getCurrentToken() == JsonToken.END_OBJECT
- && jp.nextToken() == JsonToken.START_OBJECT)) {
- jp.nextToken();
- }
- }
++ importEntitiesFromFile(file, targetEm, emManagementApp, fileImport );
- // get to start of an object i.e next entity.
- while (jp.getCurrentToken() != JsonToken.START_OBJECT) {
- jp.nextToken();
- }
+ // TODO: fix the resume on error feature
- while (jp.nextToken() != JsonToken.END_ARRAY) {
- importEntitiesFromFile(jp, targetEm, emManagementApp, fileImport);
- }
- jp.close();
+// // in case of resume, retrieve the last updated UUID for this file
+// String lastUpdatedUUID = fileImport.getLastUpdatedUUID();
+//
+// // this handles partially completed files by updating entities from the point of failure
+// if (!lastUpdatedUUID.equals(" ")) {
+//
+// // go till the last updated entity
+// while (!jp.getText().equals(lastUpdatedUUID)) {
+// jp.nextToken();
+// }
+//
+// // skip the last one and start from the next one
+// while (!(jp.getCurrentToken() == JsonToken.END_OBJECT
+// && jp.nextToken() == JsonToken.START_OBJECT)) {
+// jp.nextToken();
+// }
+// }
+//
+// // get to start of an object i.e next entity.
+// while (jp.getCurrentToken() != JsonToken.START_OBJECT) {
+// jp.nextToken();
+// }
+//
+// while (jp.nextToken() != JsonToken.END_ARRAY) {
+// importEntitiesFromFile(jp, targetEm, emManagementApp, fileImport, jobExecution);
+// }
+// jp.close();
// Updates the state of file import job
if (!fileImport.getState().toString().equals("FAILED")) {
@@@ -711,33 -741,26 +766,32 @@@
}
-
/**
* Imports the entity's connecting references (collections, connections and dictionaries)
- * @param jp JsonParser pointing to the beginning of the object.
- * @param em Entity Manager for the application being imported
- * @param rootEm Entity manager for the root applicaition
- * @param fileImport the file import entity
+ *
- * @param jp JsonParser pointing to the beginning of the object.
++ * @param file The file to be imported
+ * @param em Entity Manager for the application being imported
+ * @param rootEm Entity manager for the root applicaition
- * @param fileImport the file import entity
- * @param jobExecution execution details for the import jbo
++ * @param fileImport The file import entity
*/
private void importEntitiesFromFile(
- final JsonParser jp,
+ final File file,
final EntityManager em,
final EntityManager rootEm,
- final FileImport fileImport,
- final JobExecution jobExecution) throws Exception {
+ final FileImport fileImport) throws Exception {
- final JsonParserObservable subscribe = new JsonParserObservable(jp, em, rootEm, fileImport);
- final Observable<WriteEvent> observable = Observable.create(subscribe);
+ // first we do entities
+ boolean entitiesOnly = true;
- // This is the action we want to perform for every UUID we receive
+ // observable that parses JSON and emits write events
+ JsonParser jp = getJsonParserForFile(file);
+ final JsonEntityParserObservable jsonObservableEntities =
+ new JsonEntityParserObservable(jp, em, rootEm, fileImport, entitiesOnly);
+ final Observable<WriteEvent> entityEventObservable = Observable.create(jsonObservableEntities);
+
+ // function to execute for each write event
+ //TODO: job execution no longer needed due to having queueMessage.
final Action1<WriteEvent> doWork = new Action1<WriteEvent>() {
@Override
public void call(WriteEvent writeEvent) {
@@@ -792,30 -818,6 +846,30 @@@
}
}, Schedulers.io()).toBlocking().last();
+ jp.close();
+
+ logger.debug("\n\nWrote entities\n");
+
+ // now do other stuff: connections and dictionaries
+ entitiesOnly = false;
+
+ // observable that parses JSON and emits write events
+ jp = getJsonParserForFile(file);
+ final JsonEntityParserObservable jsonObservableOther =
+ new JsonEntityParserObservable(jp, em, rootEm, fileImport, entitiesOnly);
+ final Observable<WriteEvent> otherEventObservable = Observable.create(jsonObservableOther);
+
+ otherEventObservable.parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+ @Override
+ public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+ return entityWrapperObservable.doOnNext(doWork);
+
+ }
+ }, Schedulers.io()).toBlocking().last();
-
++
+ jp.close();
+
+ logger.debug("\n\nWrote others\n");
}
@@@ -932,22 -927,14 +986,22 @@@
// adds map to the dictionary
@Override
- public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
+ public void doWrite(EntityManager em, FileImport fileImport) {
EntityManager rootEm = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
try {
+
+ logger.debug("Adding map to {}:{} dictionary {}",
+ new Object[]{ownerEntityRef.getType(), ownerEntityRef.getType(), dictionaryName});
+
em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
+
} catch (Exception e) {
+ logger.error("Error writing dictionary", e);
fileImport.setErrorMessage(e.getMessage());
try {
+
rootEm.update(fileImport);
+
} catch (Exception ex) {
// TODO should we abort at this point?
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c652c39b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
index 53bf144,07e0798..922c16a
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
@@@ -29,11 -35,11 +35,10 @@@ import com.google.inject.Inject
import com.google.inject.Singleton;
--/**
-- * Created by ApigeeCorporation on 1/15/15.
-- *///TODO: make sure this is properly instantiated by guice
++//TODO: make sure this is properly instantiated by guice
@Singleton
public class ImportQueueListener extends QueueListener {
++
/**
* Initializes the QueueListener. Need to wire the factories up in guice.
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c652c39b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
----------------------------------------------------------------------
diff --cc stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
index 742c33e,f9a0753..b9c3bb9
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
@@@ -97,12 -103,12 +103,13 @@@ public class ImportServiceIT
applicationId = setup.getMgmtSvc().createApplication( organization.getUuid(), username+"app" ).getId();
}
++
@Before
public void before() {
boolean configured =
- !StringUtils.isEmpty(System.getProperty("secretKey"))
- && !StringUtils.isEmpty(System.getProperty("accessKey"))
+ !StringUtils.isEmpty(System.getProperty( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR))
- && !StringUtils.isEmpty(System.getProperty(SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR))
++ && !StringUtils.isEmpty(System.getProperty( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR))
&& !StringUtils.isEmpty(System.getProperty("bucketName"));
if ( !configured ) {
@@@ -416,7 -445,7 +439,7 @@@
while ( !exportService.getState( exportUUID ).equals( "FINISHED" ) ) {
;
}
-- //TODo: can check if the temp files got created
++ //TODO: can check if the temp files got created
// import
S3Import s3Import = new S3ImportImpl();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c652c39b/stack/services/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --cc stack/services/src/test/resources/log4j.properties
index 3ef4d3b,3ef4d3b..a63d719
--- a/stack/services/src/test/resources/log4j.properties
+++ b/stack/services/src/test/resources/log4j.properties
@@@ -57,6 -57,6 +57,7 @@@ log4j.logger.org.apache.usergrid.lockin
#log4j.logger.org.apache.usergrid.corepersistence=DEBUG
#log4j.logger.org.apache.usergrid.persistence.index=DEBUG
++#log4j.logger.org.apache.usergrid.batch=DEBUG
log4j.logger.org.apache.usergrid.management.export=DEBUG
log4j.logger.org.apache.usergrid.management.importer=DEBUG
[2/3] incubator-usergrid git commit: Rewrote import parser,
fixed some error handing and now ImportServiceIT tests now passing.
Posted by sn...@apache.org.
Rewrote import parser, fixed some error handing and now ImportServiceIT tests now passing.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b22c37ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b22c37ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b22c37ae
Branch: refs/heads/two-dot-o-import
Commit: b22c37aeefeb8c9c84b2170e24c966d15112fd6f
Parents: 858cbc0
Author: Dave Johnson <dm...@apigee.com>
Authored: Tue Feb 3 09:26:26 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Tue Feb 3 09:26:26 2015 -0500
----------------------------------------------------------------------
stack/services/pom.xml | 3 +-
.../management/importer/FileImportJob.java | 42 ++-
.../usergrid/management/importer/ImportJob.java | 41 ++-
.../management/importer/ImportServiceImpl.java | 329 ++++++++++---------
.../management/importer/ImportServiceIT.java | 8 +-
5 files changed, 239 insertions(+), 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b22c37ae/stack/services/pom.xml
----------------------------------------------------------------------
diff --git a/stack/services/pom.xml b/stack/services/pom.xml
index 83d49e3..820020d 100644
--- a/stack/services/pom.xml
+++ b/stack/services/pom.xml
@@ -56,7 +56,7 @@
<directory>src/main/resources</directory>
<filtering>true</filtering>
<includes>
- <include>**/*.xml</include>
+ <include>**/*.xml</include>
</includes>
</resource>
<resource>
@@ -72,6 +72,7 @@
<include>**/*.xml</include>
<include>**/*.properties</include>
<include>**/*.p12</include>
+ <include>**/*.json</include>
</includes>
</testResource>
<testResource>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b22c37ae/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
index 25295fc..cb83ce8 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
@@ -42,8 +42,8 @@ public class FileImportJob extends OnlyOnceJob {
public static final String FILE_IMPORT_ID = "fileImportId";
private static final Logger logger = LoggerFactory.getLogger(FileImportJob.class);
- // injected the Entity Manager Factory
- protected EntityManagerFactory emf;
+ @Autowired
+ EntityManagerFactory emf;
@Autowired
ImportService importService;
@@ -54,20 +54,34 @@ public class FileImportJob extends OnlyOnceJob {
@Override
protected void doJob(JobExecution jobExecution) throws Exception {
- logger.info( "execute FileImportJob {}", jobExecution.toString() );
-
- JobData jobData = jobExecution.getJobData();
- if ( jobData == null ) {
- logger.error( "jobData cannot be null" );
- return;
+ logger.info("execute FileImportJob {}", jobExecution.toString());
+
+ try {
+ JobData jobData = jobExecution.getJobData();
+ if (jobData == null) {
+ logger.error("jobData cannot be null");
+ return;
+ }
+
+ // heartbeat to indicate job has started
+ jobExecution.heartbeat();
+
+ // call the File Parser for the file set in job execution
+ importService.parseFileToEntities(jobExecution);
+
+ } catch ( Throwable t ) {
+ logger.debug("Error importing file", t);
+
+ // update file import record
+ UUID fileImportId = (UUID) jobExecution.getJobData().getProperty(FILE_IMPORT_ID);
+ EntityManager em = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
+ FileImport fileImport = em.get(fileImportId, FileImport.class);
+ fileImport.setState( FileImport.State.FAILED );
+ em.update( fileImport );
+
+ throw t;
}
- // heartbeat to indicate job has started
- jobExecution.heartbeat();
-
- // call the File Parser for the file set in job execution
- importService.parseFileToEntities(jobExecution);
-
logger.error("File Import Service completed job");
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b22c37ae/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java
index 24393cd..995850c 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java
@@ -23,6 +23,7 @@ import org.apache.usergrid.batch.job.OnlyOnceJob;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.entities.FileImport;
import org.apache.usergrid.persistence.entities.Import;
import org.apache.usergrid.persistence.entities.JobData;
import org.slf4j.Logger;
@@ -30,6 +31,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.UUID;
+
@Component("importJob")
public class ImportJob extends OnlyOnceJob {
@@ -37,8 +40,9 @@ public class ImportJob extends OnlyOnceJob {
public static final String IMPORT_ID = "importId";
private static final Logger logger = LoggerFactory.getLogger(ImportJob.class);
- //injected the Entity Manager Factory
+ @Autowired
protected EntityManagerFactory emf;
+
@Autowired
ImportService importService;
@@ -50,18 +54,33 @@ public class ImportJob extends OnlyOnceJob {
protected void doJob(JobExecution jobExecution) throws Exception {
logger.info( "execute ImportJob {}", jobExecution.getJobId().toString() );
- JobData jobData = jobExecution.getJobData();
- if ( jobData == null ) {
- logger.error( "jobData cannot be null" );
- return;
- }
+ try {
+ JobData jobData = jobExecution.getJobData();
+ if (jobData == null) {
+ logger.error("jobData cannot be null");
+ return;
+ }
- // heartbeat to indicate job has started
- jobExecution.heartbeat();
+ // heartbeat to indicate job has started
+ jobExecution.heartbeat();
- // call the doImport method from import service which
- // schedules the sub-jobs i.e. parsing of files to FileImport Job
- importService.doImport( jobExecution );
+ // call the doImport method from import service which
+ // schedules the sub-jobs i.e. parsing of files to FileImport Job
+ importService.doImport(jobExecution);
+
+ } catch ( Throwable t ) {
+ logger.error("Error calling in importJob", t);
+
+ // update import job record
+ UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
+ EntityManager mgmtApp = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
+ Import importEntity = mgmtApp.get(importId, Import.class);
+ importEntity.setState(Import.State.FAILED);
+ importEntity.setErrorMessage(t.getMessage());
+ mgmtApp.update(importEntity);
+
+ throw t;
+ }
logger.error("Import Service completed job");
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b22c37ae/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index e78675b..7339087 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -24,7 +24,6 @@ import org.apache.usergrid.management.ApplicationInfo;
import org.apache.usergrid.management.ManagementService;
import org.apache.usergrid.management.OrganizationInfo;
import org.apache.usergrid.persistence.*;
-import org.apache.usergrid.persistence.entities.Application;
import org.apache.usergrid.persistence.entities.FileImport;
import org.apache.usergrid.persistence.entities.Import;
import org.apache.usergrid.persistence.entities.JobData;
@@ -84,7 +83,7 @@ public class ImportServiceImpl implements ImportService {
EntityManager rootEm = null;
try {
- rootEm = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID);
+ rootEm = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
Set<String> collections = rootEm.getApplicationCollections();
if (!collections.contains("imports")) {
rootEm.createApplicationCollection("imports");
@@ -132,10 +131,10 @@ public class ImportServiceImpl implements ImportService {
* @return it returns the UUID of the scheduled job
* @throws Exception
*/
- public UUID scheduleFile( Map<String, Object> config, String file, EntityRef importRef) throws Exception {
+ public UUID scheduleFile(Map<String, Object> config, String file, EntityRef importRef) throws Exception {
logger.debug("scheduleFile() for import {}:{} file {}",
- new Object[] { importRef.getType(), importRef.getType(), file});
+ new Object[]{importRef.getType(), importRef.getType(), file});
EntityManager rootEm = null;
@@ -172,9 +171,9 @@ public class ImportServiceImpl implements ImportService {
//set data to be transferred to the FileImport Job
JobData jobData = new JobData();
- jobData.setProperty( "File", file);
- jobData.setProperty( FILE_IMPORT_ID, fileImport.getUuid());
- jobData.addProperties( config );
+ jobData.setProperty("File", file);
+ jobData.setProperty(FILE_IMPORT_ID, fileImport.getUuid());
+ jobData.addProperties(config);
long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
@@ -242,6 +241,7 @@ public class ImportServiceImpl implements ImportService {
/**
* Returns the Import Entity that stores all meta-data for the particular import Job
+ *
* @param jobExecution the import job details
* @return Import Entity
* @throws Exception
@@ -258,6 +258,7 @@ public class ImportServiceImpl implements ImportService {
/**
* Returns the File Import Entity that stores all meta-data for the particular sub File import Job
+ *
* @param jobExecution the file import job details
* @return File Import Entity
* @throws Exception
@@ -278,7 +279,6 @@ public class ImportServiceImpl implements ImportService {
// public ArrayList<File> getEphemeralFile() {
// return files;
// }
-
public SchedulerService getSch() {
return sch;
}
@@ -312,6 +312,7 @@ public class ImportServiceImpl implements ImportService {
/**
* This method gets the files from s3 and also creates sub-jobs for each file i.e. File Import Jobs
+ *
* @param jobExecution the job created by the scheduler with all the required config data
*/
@Override
@@ -375,7 +376,7 @@ public class ImportServiceImpl implements ImportService {
// import All the applications from an organization
//importApplicationsFromOrg(
- //(UUID) config.get("organizationId"), config, jobExecution, s3Import);
+ //(UUID) config.get("organizationId"), config, jobExecution, s3Import);
} else if (config.get("collectionName") == null) {
@@ -383,13 +384,13 @@ public class ImportServiceImpl implements ImportService {
// imports an Application from a single organization
//importApplicationFromOrg( (UUID) config.get("organizationId"),
- // (UUID) config.get("applicationId"), config, jobExecution, s3Import);
+ // (UUID) config.get("applicationId"), config, jobExecution, s3Import);
} else {
// imports a single collection from an app org combo
files = importCollectionFromOrgApp(
- (UUID)config.get("organizationId"), (UUID)config.get("applicationId"),
+ (UUID) config.get("organizationId"), (UUID) config.get("applicationId"),
config, jobExecution, s3Import);
}
}
@@ -417,7 +418,7 @@ public class ImportServiceImpl implements ImportService {
// TODO SQS: replace the method inside here so that it uses sqs instead of internal q
- UUID jobID = scheduleFile( config, file.getPath(), importUG);
+ UUID jobID = scheduleFile(config, file.getPath(), importUG);
Map<String, Object> fileJobID = new HashMap<String, Object>();
fileJobID.put("FileName", file.getName());
@@ -457,9 +458,9 @@ public class ImportServiceImpl implements ImportService {
String collectionName = config.get("collectionName").toString();
String appFileName = prepareCollectionInputFileName(
- organizationInfo.getName(), application.getName(), collectionName );
+ organizationInfo.getName(), application.getName(), collectionName);
- return copyFileFromS3(importUG, appFileName, config, s3Import, ImportType.COLLECTION);
+ return copyFileFromS3(importUG, appFileName, config, s3Import, ImportType.COLLECTION);
}
@@ -506,30 +507,31 @@ public class ImportServiceImpl implements ImportService {
}
// prepares the prefix path for the files to be import depending on the endpoint being hit
- String appFileName = prepareOrganizationInputFileName( organizationInfo.getName());
+ String appFileName = prepareOrganizationInputFileName(organizationInfo.getName());
return copyFileFromS3(importUG, appFileName, config, s3Import, ImportType.ORGANIZATION);
}
- protected String prepareCollectionInputFileName(String orgName, String appName, String collectionName ) {
+ protected String prepareCollectionInputFileName(String orgName, String appName, String collectionName) {
return orgName + "/" + appName + "." + collectionName + ".";
}
- protected String prepareApplicationInputFileName(String orgName, String appName ) {
+ protected String prepareApplicationInputFileName(String orgName, String appName) {
return orgName + "/" + appName + ".";
}
- protected String prepareOrganizationInputFileName(String orgName ) {
+ protected String prepareOrganizationInputFileName(String orgName) {
return orgName + "/";
}
/**
* Copies file from S3.
+ *
* @param importUG Import instance
* @param appFileName the base file name for the files to be downloaded
* @param config the config information for the import job
@@ -537,7 +539,7 @@ public class ImportServiceImpl implements ImportService {
* @param type it indicates the type of import
*/
public ArrayList<File> copyFileFromS3(Import importUG, String appFileName,
- Map<String, Object> config, S3Import s3Import, ImportType type) throws Exception {
+ Map<String, Object> config, S3Import s3Import, ImportType type) throws Exception {
EntityManager rootEm = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
ArrayList<File> copyFiles = new ArrayList<>();
@@ -587,43 +589,45 @@ public class ImportServiceImpl implements ImportService {
emManagementApp.update(fileImport);
// Get target application ID from the job data (NOT from the filename)
- UUID targetAppId = (UUID)jobExecution.getJobData().getProperty("applicationId");
+ UUID targetAppId = (UUID) jobExecution.getJobData().getProperty("applicationId");
- if ( emManagementApp.get( targetAppId ) == null ) {
+ if (emManagementApp.get(targetAppId) == null) {
throw new IllegalArgumentException("Application does not exist: " + targetAppId.toString());
}
- EntityManager targetEm = emf.getEntityManager( targetAppId );
+ EntityManager targetEm = emf.getEntityManager(targetAppId);
logger.debug(" importing into app {} file {}", targetAppId.toString(), file.getAbsolutePath());
- JsonParser jp = getJsonParserForFile(file);
-
- // in case of resume, retrieve the last updated UUID for this file
- String lastUpdatedUUID = fileImport.getLastUpdatedUUID();
-
- // this handles partially completed files by updating entities from the point of failure
- if (!lastUpdatedUUID.equals(" ")) {
+ importEntitiesFromFile(file, targetEm, emManagementApp, fileImport, jobExecution);
- // go till the last updated entity
- while (!jp.getText().equals(lastUpdatedUUID)) {
- jp.nextToken();
- }
-
- // skip the last one and start from the next one
- while (!(jp.getCurrentToken() == JsonToken.END_OBJECT
- && jp.nextToken() == JsonToken.START_OBJECT)) {
- jp.nextToken();
- }
- }
+ // TODO: fix the resume on error feature
- // get to start of an object i.e next entity.
- while (jp.getCurrentToken() != JsonToken.START_OBJECT) {
- jp.nextToken();
- }
-
- while (jp.nextToken() != JsonToken.END_ARRAY) {
- importEntitiesFromFile(jp, targetEm, emManagementApp, fileImport, jobExecution);
- }
- jp.close();
+// // in case of resume, retrieve the last updated UUID for this file
+// String lastUpdatedUUID = fileImport.getLastUpdatedUUID();
+//
+// // this handles partially completed files by updating entities from the point of failure
+// if (!lastUpdatedUUID.equals(" ")) {
+//
+// // go till the last updated entity
+// while (!jp.getText().equals(lastUpdatedUUID)) {
+// jp.nextToken();
+// }
+//
+// // skip the last one and start from the next one
+// while (!(jp.getCurrentToken() == JsonToken.END_OBJECT
+// && jp.nextToken() == JsonToken.START_OBJECT)) {
+// jp.nextToken();
+// }
+// }
+//
+// // get to start of an object i.e next entity.
+// while (jp.getCurrentToken() != JsonToken.START_OBJECT) {
+// jp.nextToken();
+// }
+//
+// while (jp.nextToken() != JsonToken.END_ARRAY) {
+// importEntitiesFromFile(jp, targetEm, emManagementApp, fileImport, jobExecution);
+// }
+// jp.close();
// Updates the state of file import job
if (!fileImport.getState().toString().equals("FAILED")) {
@@ -642,7 +646,7 @@ public class ImportServiceImpl implements ImportService {
UUID importId = importEntity.get(0).getUuid();
Import importUG = emManagementApp.get(importId, Import.class);
- Results entities = emManagementApp.getConnectedEntities( importUG, "includes", null, Level.ALL_PROPERTIES);
+ Results entities = emManagementApp.getConnectedEntities(importUG, "includes", null, Level.ALL_PROPERTIES);
List<Entity> importFile = entities.getEntities();
int count = 0;
@@ -667,9 +671,10 @@ public class ImportServiceImpl implements ImportService {
/**
* Checks if a file is a valid JSON
+ *
* @param collectionFile the file being validated
- * @param rootEm the Entity Manager for the Management application
- * @param fileImport the file import entity
+ * @param rootEm the Entity Manager for the Management application
+ * @param fileImport the file import entity
* @return
* @throws Exception
*/
@@ -696,6 +701,7 @@ public class ImportServiceImpl implements ImportService {
/**
* Gets the JSON parser for given file
+ *
* @param collectionFile the file for which JSON parser is required
*/
private JsonParser getJsonParserForFile(File collectionFile) throws Exception {
@@ -705,27 +711,33 @@ public class ImportServiceImpl implements ImportService {
}
-
/**
* Imports the entity's connecting references (collections, connections and dictionaries)
- * @param jp JsonParser pointing to the beginning of the object.
- * @param em Entity Manager for the application being imported
- * @param rootEm Entity manager for the root applicaition
- * @param fileImport the file import entity
- * @param jobExecution execution details for the import jbo
+ *
+ * @param jp JsonParser pointing to the beginning of the object.
+ * @param em Entity Manager for the application being imported
+ * @param rootEm Entity manager for the root applicaition
+ * @param fileImport the file import entity
+ * @param jobExecution execution details for the import jbo
*/
private void importEntitiesFromFile(
- final JsonParser jp,
+ final File file,
final EntityManager em,
final EntityManager rootEm,
final FileImport fileImport,
final JobExecution jobExecution) throws Exception {
- final JsonParserObservable subscribe = new JsonParserObservable(jp, em, rootEm, fileImport);
- final Observable<WriteEvent> observable = Observable.create(subscribe);
+ // first we do entities
+ boolean entitiesOnly = true;
- // This is the action we want to perform for every UUID we receive
+ // observable that parses JSON and emits write events
+ JsonParser jp = getJsonParserForFile(file);
+ final JsonEntityParserObservable jsonObservableEntities =
+ new JsonEntityParserObservable(jp, em, rootEm, fileImport, entitiesOnly);
+ final Observable<WriteEvent> entityEventObservable = Observable.create(jsonObservableEntities);
+
+ // function to execute for each write event
final Action1<WriteEvent> doWork = new Action1<WriteEvent>() {
@Override
public void call(WriteEvent writeEvent) {
@@ -736,16 +748,13 @@ public class ImportServiceImpl implements ImportService {
// final AtomicLong entityCounter = new AtomicLong();
// final AtomicLong eventCounter = new AtomicLong();
- // This is boilerplate glue code. We have to follow this for the parallel operation.
- // In the "call" method we want to simply return the input observable + the chain of
- // operations we want to invoke
-
- observable.parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+ // start parsing JSON
+ entityEventObservable.parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
@Override
public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
- // TODO: need to fixed so that number of entities created can be counted correctly and
- // TODO: also update last updated UUID for fileImport which is a must for resume-ability
+ // TODO: need to fixed so that number of entities created can be counted correctly and
+ // TODO: also update last updated UUID for fileImport which is a must for resume-ability
// return entityWrapperObservable.doOnNext(doWork).doOnNext(new Action1<WriteEvent>() {
//
@@ -783,6 +792,30 @@ public class ImportServiceImpl implements ImportService {
}
}, Schedulers.io()).toBlocking().last();
+ jp.close();
+
+ logger.debug("\n\nWrote entities\n");
+
+ // now do other stuff: connections and dictionaries
+ entitiesOnly = false;
+
+ // observable that parses JSON and emits write events
+ jp = getJsonParserForFile(file);
+ final JsonEntityParserObservable jsonObservableOther =
+ new JsonEntityParserObservable(jp, em, rootEm, fileImport, entitiesOnly);
+ final Observable<WriteEvent> otherEventObservable = Observable.create(jsonObservableOther);
+
+ otherEventObservable.parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+ @Override
+ public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+ return entityWrapperObservable.doOnNext(doWork);
+
+ }
+ }, Schedulers.io()).toBlocking().last();
+
+ jp.close();
+
+ logger.debug("\n\nWrote others\n");
}
@@ -813,7 +846,7 @@ public class ImportServiceImpl implements ImportService {
try {
logger.debug("Writing imported entity {}:{} into app {}",
- new Object[] { entityType, entityUuid, em.getApplication().getUuid() });
+ new Object[]{entityType, entityUuid, em.getApplication().getUuid()});
em.create(entityUuid, entityType, properties);
@@ -858,14 +891,14 @@ public class ImportServiceImpl implements ImportService {
// TODO: what happens if ConnectionEvents happen before all entities are saved?
// Connections are specified as UUIDs with no type
- if ( entityRef.getType() == null ) {
+ if (entityRef.getType() == null) {
entityRef = em.get(ownerEntityRef.getUuid());
}
logger.debug("Creating connection from {}:{} to {}:{}",
- new Object[] {
+ new Object[]{
ownerEntityRef.getType(), ownerEntityRef.getUuid(),
- entityRef.getType(), entityRef.getUuid() });
+ entityRef.getType(), entityRef.getUuid()});
em.createConnection(ownerEntityRef, connectionType, entityRef);
@@ -902,9 +935,9 @@ public class ImportServiceImpl implements ImportService {
public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
EntityManager rootEm = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
try {
-
+
logger.debug("Adding map to {}:{} dictionary {}",
- new Object[] {ownerEntityRef.getType(), ownerEntityRef.getType(), dictionaryName });
+ new Object[]{ownerEntityRef.getType(), ownerEntityRef.getType(), dictionaryName});
em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
@@ -926,131 +959,120 @@ public class ImportServiceImpl implements ImportService {
}
- private final class JsonParserObservable implements Observable.OnSubscribe<WriteEvent> {
+ private final class JsonEntityParserObservable implements Observable.OnSubscribe<WriteEvent> {
private final JsonParser jp;
EntityManager em;
EntityManager rootEm;
FileImport fileImport;
+ boolean entitiesOnly;
- JsonParserObservable(JsonParser parser, EntityManager em, EntityManager rootEm, FileImport fileImport) {
+ JsonEntityParserObservable(
+ JsonParser parser,
+ EntityManager em,
+ EntityManager rootEm,
+ FileImport fileImport,
+ boolean entitiesOnly) {
this.jp = parser;
this.em = em;
this.rootEm = rootEm;
this.fileImport = fileImport;
+ this.entitiesOnly = entitiesOnly;
}
@Override
public void call(final Subscriber<? super WriteEvent> subscriber) {
-
- // have to do this in two passes so that entities are created before connections
-
- // first entities
- process( subscriber, true );
-
- // next connections and dictionaries
- process( subscriber, false);
+ process(subscriber);
}
- private void process(final Subscriber<? super WriteEvent> subscriber, boolean entities ) {
-
- logger.debug("process(): entities = " + entities );
-
- EntityRef lastEntitySeenRef = null;
+ private void process(final Subscriber<? super WriteEvent> subscriber) {
try {
- while (!subscriber.isUnsubscribed() && jp.nextToken() != JsonToken.END_OBJECT) {
-
- String collectionName = jp.getCurrentName();
+ boolean done = false;
+ Stack tokenStack = new Stack();
+ EntityRef lastEntity = null;
- logger.debug("Processing currentName: " + jp.getCurrentName());
+ while (!done) {
- // create the wrapper for connections
- if (collectionName != null && collectionName.equals("connections")) {
-
- jp.nextToken(); // START_OBJECT
- while (jp.nextToken() != JsonToken.END_OBJECT) {
- String connectionType = jp.getCurrentName();
-
- jp.nextToken(); // START_ARRAY
- while (jp.nextToken() != JsonToken.END_ARRAY) {
- String entryId = jp.getText();
-
- if ( !entities ) {
- EntityRef entryRef = new SimpleEntityRef(UUID.fromString(entryId));
- WriteEvent entityWrapper = new ConnectionEvent(lastEntitySeenRef, connectionType, entryRef);
- subscriber.onNext(entityWrapper);
- }
- }
- }
+ JsonToken token = jp.nextToken();
+ String name = jp.getCurrentName();
+ String indent = "";
+ for (int i = 0; i < tokenStack.size(); i++) {
+ indent += " ";
}
- // create the wrapper for dictionaries
- else if (collectionName != null && collectionName.equals("dictionaries")) {
- jp.nextToken(); // START_OBJECT
- while (jp.nextToken() != JsonToken.END_OBJECT) {
+ logger.debug("{}Token {} name {}", new Object[]{indent, token, name});
- String dictionaryName = jp.getCurrentName();
- jp.nextToken();
- Map<String, Object> dictionary = jp.readValueAs(HashMap.class);
+ if (token.equals(JsonToken.START_OBJECT) && "Metadata".equals(name)) {
- if ( !entities ) {
- WriteEvent entityWrapper = new DictionaryEvent(
- lastEntitySeenRef, dictionaryName, dictionary);
- subscriber.onNext(entityWrapper);
- }
- }
- subscriber.onCompleted();
+ Map<String, Object> entityMap = jp.readValueAs(HashMap.class);
- } else {
+ String type = (String) entityMap.get("type");
+ UUID uuid = UUID.fromString((String) entityMap.get("uuid"));
+ lastEntity = new SimpleEntityRef(type, uuid);
- // Regular collections
+ logger.debug("{}Got entity with uuid {}", indent, lastEntity);
+ if (entitiesOnly) {
+ WriteEvent event = new EntityEvent(uuid, type, entityMap);
+ subscriber.onNext(event);
+ }
- jp.nextToken(); // START_OBJECT
+ } else if (token.equals(JsonToken.START_OBJECT) && "connections".equals(name)) {
- JsonToken token = jp.nextToken();
+ Map<String, Object> connectionMap = jp.readValueAs(HashMap.class);
- Map<String, Object> properties = new HashMap<>();
+ for (String type : connectionMap.keySet()) {
+ List targets = (List) connectionMap.get(type);
- String entityUuid = null;
- String entityType = null;
+ for (Object targetObject : targets) {
+ UUID target = UUID.fromString((String) targetObject);
- while (token != JsonToken.END_OBJECT) {
+ logger.debug("{}Got connection {} to {}",
+ new Object[]{indent, type, target.toString()});
- if (token == JsonToken.VALUE_STRING || token == JsonToken.VALUE_NUMBER_INT) {
+ if (!entitiesOnly) {
+ EntityRef entryRef = new SimpleEntityRef(target);
+ WriteEvent event = new ConnectionEvent(lastEntity, type, entryRef);
+ subscriber.onNext(event);
+ }
+ }
+ }
- String key = jp.getCurrentName();
- logger.debug(" currentName: " + jp.getText());
+ } else if (token.equals(JsonToken.START_OBJECT) && "dictionaries".equals(name)) {
- if (key.equals("uuid")) {
- entityUuid = jp.getText();
+ Map<String, Object> dictionariesMap = jp.readValueAs(HashMap.class);
+ for (String dname : dictionariesMap.keySet()) {
+ Map dmap = (Map) dictionariesMap.get(dname);
- } else if (key.equals("type")) {
- entityType = jp.getText();
+ logger.debug("{}Got dictionary {} size {}",
+ new Object[] {indent, dname, dmap.size() });
- } else if (key.length() != 0 && jp.getText().length() != 0) {
- String value = jp.getText();
- properties.put(key, value);
- }
+ if (!entitiesOnly) {
+ WriteEvent event = new DictionaryEvent(lastEntity, dname, dmap);
+ subscriber.onNext(event);
}
- token = jp.nextToken();
}
- if ( entities ) {
- WriteEvent entityWrapper = new EntityEvent(
- UUID.fromString(entityUuid), entityType, properties);
- subscriber.onNext(entityWrapper);
- }
+ } else if (token.equals(JsonToken.START_OBJECT)) {
+ tokenStack.push(token);
+
+ } else if (token.equals(JsonToken.END_OBJECT)) {
+ tokenStack.pop();
+ }
- // don't save it, but do keep track of last entity seen
- lastEntitySeenRef = new SimpleEntityRef( entityType, UUID.fromString(entityUuid) );
+ if (token.equals(JsonToken.END_ARRAY) && tokenStack.isEmpty()) {
+ done = true;
}
}
+ subscriber.onCompleted();
+
+ logger.debug("process(): done parsing JSON");
+
} catch (Exception e) {
// skip illegal entity UUID and go to next one
fileImport.setErrorMessage(e.getMessage());
@@ -1062,7 +1084,6 @@ public class ImportServiceImpl implements ImportService {
subscriber.onError(e);
}
}
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b22c37ae/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
index 178909b..742c33e 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
@@ -233,10 +233,10 @@ public class ImportServiceIT {
}
}
-// // all things should have been updated
-// for ( Entity e : importedThings ) {
-// assertTrue(e.getModified() > thingsMap.get(e.getUuid()).getModified());
-// }
+ // all things should have been updated
+ for ( Entity e : importedThings ) {
+ assertTrue(e.getModified() > thingsMap.get(e.getUuid()).getModified());
+ }
}
finally {