You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/02/09 17:28:46 UTC
incubator-usergrid git commit: Files are now downloaded by the
workers and not the master,
see the new S3ImportImpl and ImportServiceImpl.downloadAndImportFile().
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-import 2e05f2496 -> 18671e7a9
Files are now downloaded by the workers and not the master, see the new S3ImportImpl and ImportServiceImpl.downloadAndImportFile().
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/18671e7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/18671e7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/18671e7a
Branch: refs/heads/two-dot-o-import
Commit: 18671e7a9227babc4b868fc5dee0bbef0d4076ac
Parents: 2e05f24
Author: Dave Johnson <dm...@apigee.com>
Authored: Mon Feb 9 11:09:17 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Mon Feb 9 11:09:17 2015 -0500
----------------------------------------------------------------------
.../management/importer/FileImportJob.java | 2 +-
.../management/importer/ImportService.java | 2 +-
.../management/importer/ImportServiceImpl.java | 458 +++++++------------
.../usergrid/management/importer/S3Import.java | 8 +-
.../management/importer/S3ImportImpl.java | 224 ++-------
.../services/queues/ImportQueueListener.java | 2 +-
.../management/importer/ImportCollectionIT.java | 12 +-
.../management/importer/MockS3ImportImpl.java | 12 +-
8 files changed, 239 insertions(+), 481 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/18671e7a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
index 089c8e5..a77bb99 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
@@ -67,7 +67,7 @@ public class FileImportJob extends OnlyOnceJob {
jobExecution.heartbeat();
// call the File Parser for the file set in job execution
- importService.parseFileToEntities(jobExecution);
+ importService.downloadAndImportFile(jobExecution);
} catch ( Throwable t ) {
logger.debug("Error importing file", t);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/18671e7a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java
index 58334c9..c464a18 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java
@@ -48,7 +48,7 @@ public interface ImportService {
/**
* Parses the input file and creates entities
*/
- void parseFileToEntities(JobExecution jobExecution) throws Exception;
+ void downloadAndImportFile(JobExecution jobExecution) throws Exception;
/**
* Get the state for the Job with UUID
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/18671e7a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 51e9a09..f724536 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -17,18 +17,24 @@
package org.apache.usergrid.management.importer;
+import com.amazonaws.SDKGlobalConfiguration;
import org.apache.usergrid.batch.JobExecution;
import org.apache.usergrid.batch.service.SchedulerService;
import org.apache.usergrid.corepersistence.CpSetup;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.management.ApplicationInfo;
import org.apache.usergrid.management.ManagementService;
-import org.apache.usergrid.management.OrganizationInfo;
import org.apache.usergrid.persistence.*;
import org.apache.usergrid.persistence.entities.FileImport;
import org.apache.usergrid.persistence.entities.Import;
import org.apache.usergrid.persistence.entities.JobData;
-
+import org.apache.usergrid.persistence.index.query.Query.Level;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.QueueScopeFactory;
+import org.apache.usergrid.services.ServiceManagerFactory;
+import org.apache.usergrid.services.queues.ImportQueueListener;
+import org.apache.usergrid.services.queues.ImportQueueMessage;
import org.apache.usergrid.utils.InflectionUtils;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonParser;
@@ -43,21 +49,10 @@ import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;
+import javax.annotation.PostConstruct;
import java.io.File;
import java.util.*;
-import javax.annotation.PostConstruct;
-
-
-import org.apache.usergrid.persistence.index.query.Query.Level;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueScope;
-import org.apache.usergrid.persistence.queue.QueueScopeFactory;
-import org.apache.usergrid.services.ServiceManagerFactory;
-import org.apache.usergrid.services.queues.ImportQueueListener;
-import org.apache.usergrid.services.queues.ImportQueueMessage;
-
public class ImportServiceImpl implements ImportService {
@@ -104,7 +99,7 @@ public class ImportServiceImpl implements ImportService {
}
/**
- * This schedules the main import Job
+ * This schedules the main import Job.
*
* @param config configuration of the job to be scheduled
* @return it returns the UUID of the scheduled job
@@ -153,7 +148,6 @@ public class ImportServiceImpl implements ImportService {
// schedule import job
sch.createJob(IMPORT_JOB_NAME, soonestPossible, jobData);
-
// update state for import job to created
importUG.setState(Import.State.SCHEDULED);
rootEm.update(importUG);
@@ -161,6 +155,7 @@ public class ImportServiceImpl implements ImportService {
return importUG.getUuid();
}
+
/**
* This schedules the sub FileImport Job
*
@@ -202,18 +197,18 @@ public class ImportServiceImpl implements ImportService {
fileImport.setState( FileImport.State.CREATED );
rootEm.update( fileImport );
- //set data to be transferred to the FileImport Job
+ // set data to be transferred to the FileImport Job
JobData jobData = new JobData();
jobData.setProperty( "File", file );
jobData.setProperty(FILE_IMPORT_ID, fileImport.getUuid());
jobData.addProperties(config);
-
-
- //update state of the job to Scheduled
+ // update state of the job to Scheduled
fileImport.setState(FileImport.State.SCHEDULED);
rootEm.update(fileImport);
+ rootEm.refreshIndex();
+
return jobData;
}
@@ -224,18 +219,14 @@ public class ImportServiceImpl implements ImportService {
*/
public JobData scheduleFileTasks( final JobData jobData ) {
-
long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
- // TODO SQS: Tear this part out and set the new job to be taken in here
// schedule file import job
- return sch.createJob( FILE_IMPORT_JOB_NAME, soonestPossible, jobData );
+ return sch.createJob(FILE_IMPORT_JOB_NAME, soonestPossible, jobData);
}
/**
* Query Entity Manager for the state of the Import Entity. This corresponds to the GET /import
- *
- * @return String
*/
@Override
public String getState(UUID uuid) throws Exception {
@@ -258,8 +249,6 @@ public class ImportServiceImpl implements ImportService {
/**
* Query Entity Manager for the error message generated for an import job.
- *
- * @return String
*/
@Override
public String getErrorMessage(final UUID uuid) throws Exception {
@@ -288,7 +277,6 @@ public class ImportServiceImpl implements ImportService {
*
* @param jobExecution the import job details
* @return Import Entity
- * @throws Exception
*/
@Override
public Import getImportEntity(final JobExecution jobExecution) throws Exception {
@@ -360,7 +348,7 @@ public class ImportServiceImpl implements ImportService {
/**
- * This method gets the files from s3 and also creates sub-jobs for each file i.e. File Import Jobs
+ * This method creates sub-jobs for each file i.e. File Import Jobs.
*
* @param jobExecution the job created by the scheduler with all the required config data
*/
@@ -369,26 +357,38 @@ public class ImportServiceImpl implements ImportService {
logger.debug("doImport()");
- Map<String, Object> config = (Map<String, Object>) jobExecution.getJobData().getProperty("importInfo");
- Object s3PlaceHolder = jobExecution.getJobData().getProperty("s3Import");
- S3Import s3Import = null;
-
+ Map<String, Object> config =
+ (Map<String, Object>) jobExecution.getJobData().getProperty("importInfo");
if (config == null) {
logger.error("doImport(): Import Information passed through is null");
return;
}
- //get the entity manager for the application, and the entity that this Import corresponds to.
- UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
+ Map<String, Object> properties =
+ (Map<String, Object>)config.get("properties");
+ Map<String, Object> storage_info =
+ (Map<String, Object>) properties.get("storage_info");
+
+ String bucketName = (String) storage_info.get("bucket_location");
+ String accessId = (String) storage_info.get( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR);
+ String secretKey = (String) storage_info.get( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR);
- EntityManager rooteEm = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
- Import rootImportTask = rooteEm.get(importId, Import.class);
+ // get Import Entity from the management app, update it to show that job has started
- //update the entity state to show that the job has officially started.
- rootImportTask.setState( Import.State.STARTED );
- rootImportTask.setStarted( System.currentTimeMillis() );
+ EntityManager emManagementApp = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
+ UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
+ Import rootImportTask = emManagementApp.get(importId, Import.class);
+
+ rootImportTask.setState(Import.State.STARTED);
+ rootImportTask.setStarted(System.currentTimeMillis());
rootImportTask.setErrorMessage( " " );
- rooteEm.update(rootImportTask);
+ emManagementApp.update(rootImportTask);
+ logger.debug("doImport(): updated state");
+
+ // if no S3 importer was passed in then create one
+
+ S3Import s3Import;
+ Object s3PlaceHolder = jobExecution.getJobData().getProperty("s3Import");
try {
if (s3PlaceHolder != null) {
s3Import = (S3Import) s3PlaceHolder;
@@ -396,256 +396,117 @@ public class ImportServiceImpl implements ImportService {
s3Import = new S3ImportImpl();
}
} catch (Exception e) {
- logger.error("doImport(): S3Import doesn't exist");
- rootImportTask.setErrorMessage( e.getMessage() );
+ logger.error("doImport(): Error creating S3Import", e);
+ rootImportTask.setErrorMessage(e.getMessage());
rootImportTask.setState( Import.State.FAILED );
- rooteEm.update(rootImportTask);
+ emManagementApp.update(rootImportTask);
return;
}
- logger.debug("doImport(): updated state");
-
- final List<File> files;
+ // get list of all JSON files in S3 bucket
+ final List<String> bucketFiles;
try {
if (config.get("organizationId") == null) {
logger.error("doImport(): No organization could be found");
rootImportTask.setErrorMessage( "No organization could be found" );
rootImportTask.setState( Import.State.FAILED );
- rooteEm.update(rootImportTask);
+ emManagementApp.update(rootImportTask);
return;
} else {
if (config.get("applicationId") == null) {
-
throw new UnsupportedOperationException("Import applications not supported");
- // import All the applications from an organization
- //importApplicationsFromOrg(
- //(UUID) config.get("organizationId"), config, jobExecution, s3Import);
-
} else if (config.get("collectionName") == null) {
-
throw new UnsupportedOperationException("Import application not supported");
- // imports an Application from a single organization
- //importApplicationFromOrg( (UUID) config.get("organizationId"),
- // (UUID) config.get("applicationId"), config, jobExecution, s3Import);
-
} else {
-
- // imports a single collection from an app org combo
- files = importCollectionFromOrgApp(
- (UUID) config.get("organizationId"), (UUID) config.get("applicationId"),
- config, jobExecution, s3Import);
+ bucketFiles = s3Import.getBucketFileNames( bucketName, ".json", accessId, secretKey );
}
}
} catch (OrganizationNotFoundException | ApplicationNotFoundException e) {
rootImportTask.setErrorMessage( e.getMessage() );
rootImportTask.setState( Import.State.FAILED );
- rooteEm.update(rootImportTask);
+ emManagementApp.update(rootImportTask);
return;
}
- if (files.size() == 0) {
- rootImportTask.setState( Import.State.FINISHED );
- rootImportTask.setErrorMessage( "no files found in the bucket with the relevant context" );
- rooteEm.update(rootImportTask);
-
- } else {
- Map<String, Object> fileMetadata = new HashMap<String, Object>();
+ // schedule a FileImport job for each file found in the bucket
- ArrayList<Map<String, Object>> value = new ArrayList<Map<String, Object>>();
+ if ( bucketFiles.isEmpty() ) {
+ rootImportTask.setState( Import.State.FINISHED );
+ rootImportTask.setErrorMessage( "No files found in the bucket: " + bucketName );
+ emManagementApp.update(rootImportTask);
- final List<JobData> fileJobs = new ArrayList<>(files.size());
+ } else {
- //create the connection and set up metadata
- for (File file : files) {
+ Map<String, Object> fileMetadata = new HashMap<>();
+ ArrayList<Map<String, Object>> value = new ArrayList<>();
+ final List<JobData> fileJobs = new ArrayList<>(bucketFiles.size());
- // TODO SQS: replace the method inside here so that it uses sqs instead of internal q
- final JobData jobData = createFileTask( config, file.getPath(), rootImportTask );
+ // create the Entity Connection and set up metadata for each job
+ for ( String bucketFile : bucketFiles ) {
+ final JobData jobData = createFileTask( config, bucketFile, rootImportTask );
fileJobs.add( jobData) ;
-
}
- //schedule each job
- for(JobData jobData: fileJobs){
+ // schedule each job
+
+ for ( JobData jobData: fileJobs ) {
final JobData scheduled = scheduleFileTasks( jobData );
- Map<String, Object> fileJobID = new HashMap<String, Object>();
- fileJobID.put("FileName", scheduled.getProperty( "File" ));
- fileJobID.put("JobID", scheduled.getUuid());
+ Map<String, Object> fileJobID = new HashMap<>();
+ fileJobID.put("FileName", scheduled.getProperty( "File" ));
+ fileJobID.put("JobID", scheduled.getUuid());
value.add(fileJobID);
}
-
-
fileMetadata.put("files", value);
rootImportTask.addProperties( fileMetadata );
- rooteEm.update(rootImportTask);
- }
- }
-
-
- /**
- * Imports a specific collection from an org-app combo.
- */
- private List<File> importCollectionFromOrgApp(
- UUID organizationUUID, UUID applicationUUID, final Map<String, Object> config,
- final JobExecution jobExecution, S3Import s3Import) throws Exception {
-
- logger.debug("importCollectionFromOrgApp()");
-
- //retrieves import entity
- Import importUG = getImportEntity( jobExecution );
-
- ApplicationInfo application = managementService.getApplicationInfo(applicationUUID);
- if (application == null) {
- throw new ApplicationNotFoundException("Application Not Found");
- }
-
- OrganizationInfo organizationInfo = managementService.getOrganizationByUuid(organizationUUID);
- if (organizationInfo == null) {
- throw new OrganizationNotFoundException("Organization Not Found");
- }
-
- String collectionName = config.get("collectionName").toString();
-
- String appFileName = prepareCollectionInputFileName( organizationInfo.getName(), application.getName(),
- collectionName );
-
- return copyFileFromS3(importUG, appFileName, config, s3Import, ImportType.COLLECTION);
-
- }
-
- /**
- * Imports a specific applications from an organization
- */
- private List<File> importApplicationFromOrg(
- UUID organizationUUID, UUID applicationId, final Map<String, Object> config,
- final JobExecution jobExecution, S3Import s3Import) throws Exception {
-
- //retrieves import entity
- Import importUG = getImportEntity(jobExecution);
-
- ApplicationInfo application = managementService.getApplicationInfo( applicationId );
- if (application == null) {
- throw new ApplicationNotFoundException("Application Not Found");
- }
-
- OrganizationInfo organizationInfo = managementService.getOrganizationByUuid(organizationUUID);
- if (organizationInfo == null) {
- throw new OrganizationNotFoundException("Organization Not Found");
- }
-
- String appFileName = prepareApplicationInputFileName( organizationInfo.getName(), application.getName() );
-
- return copyFileFromS3(importUG, appFileName, config, s3Import, ImportType.APPLICATION);
-
- }
-
- /**
- * Imports All Applications from an Organization
- */
- private List<File> importApplicationsFromOrg(
- UUID organizationUUID, final Map<String, Object> config,
- final JobExecution jobExecution, S3Import s3Import) throws Exception {
-
- // retrieves import entity
- Import importUG = getImportEntity(jobExecution);
-
- OrganizationInfo organizationInfo = managementService.getOrganizationByUuid( organizationUUID );
- if (organizationInfo == null) {
- throw new OrganizationNotFoundException("Organization Not Found");
+ emManagementApp.update(rootImportTask);
}
-
- // prepares the prefix path for the files to be import depending on the endpoint being hit
- String appFileName = prepareOrganizationInputFileName( organizationInfo.getName() );
-
- return copyFileFromS3(importUG, appFileName, config, s3Import, ImportType.ORGANIZATION);
-
- }
-
-
- protected String prepareCollectionInputFileName(String orgName, String appName, String collectionName) {
- return orgName + "/" + appName + "." + collectionName + ".";
- }
-
-
- protected String prepareApplicationInputFileName(String orgName, String appName) {
- return orgName + "/" + appName + ".";
- }
-
-
- protected String prepareOrganizationInputFileName(String orgName) {
- return orgName + "/";
}
- /**
- * Copies file from S3.
- *
- * @param importUG Import instance
- * @param appFileName the base file name for the files to be downloaded
- * @param config the config information for the import job
- * @param s3Import s3import instance
- * @param type it indicates the type of import
- */
- public ArrayList<File> copyFileFromS3(Import importUG, String appFileName,
- Map<String, Object> config, S3Import s3Import, ImportType type) throws Exception {
-
- EntityManager rootEm = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
- ArrayList<File> copyFiles = new ArrayList<>();
-
- try {
- copyFiles = s3Import.copyFromS3(config, appFileName, type);
+ @Override
+ public void downloadAndImportFile(JobExecution jobExecution) throws Exception {
- } catch (Exception e) {
- logger.debug("Error copying from S3, continuing...", e);
- importUG.setErrorMessage(e.getMessage());
- importUG.setState(Import.State.FAILED);
- rootEm.update(importUG);
+ Map<String, Object> properties =
+ (Map<String, Object>)jobExecution.getJobData().getProperty("properties");
+ if (properties == null) {
+ logger.error("downloadAndImportFile(): Import Information passed through is null");
+ return;
}
- return copyFiles;
- }
-
+ Map<String, Object> storage_info =
+ (Map<String, Object>) properties.get("storage_info");
+ String bucketName = (String) storage_info.get("bucket_location");
+ String accessId = (String) storage_info.get( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR);
+ String secretKey = (String) storage_info.get( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR);
- @Override
- // TODO: ImportService should not have to know about JobExecution
- public void parseFileToEntities(JobExecution jobExecution) throws Exception {
-
- FileImport fileImport = getFileImportEntity( jobExecution );
- File file = new File(jobExecution.getJobData().getProperty("File").toString());
+ FileImport fileImport = getFileImportEntity(jobExecution);
+ String fileName = jobExecution.getJobData().getProperty("File").toString();
UUID targetAppId = (UUID) jobExecution.getJobData().getProperty("applicationId");
- parseFileToEntities( jobExecution, fileImport, file, targetAppId );
- }
-
-
- public void parseFileToEntities( final JobExecution execution, final FileImport fileImport, final File file,
- final UUID targetAppId ) throws Exception {
-
- logger.debug( "parseFileToEntities() for file {} ", file.getAbsolutePath() );
-
- EntityManager emManagementApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
- emManagementApp.update( fileImport );
+ logger.debug("downloadAndImportFile() for file {} ", fileName);
- //already done, nothing to do
- if ( FileImport.State.FAILED.equals( fileImport.getState() ) || FileImport.State.FINISHED
- .equals( fileImport.getState() ) ) {
+ if ( FileImport.State.FAILED.equals( fileImport.getState() )
+ || FileImport.State.FINISHED .equals(fileImport.getState()) ) {
return;
}
+ // update FileImport Entity to indicate that we have started
- // mark the File import job as started
+ EntityManager emManagementApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
+ emManagementApp.update( fileImport );
fileImport.setState( FileImport.State.STARTED );
emManagementApp.update( fileImport );
@@ -653,64 +514,70 @@ public class ImportServiceImpl implements ImportService {
throw new IllegalArgumentException( "Application does not exist: " + targetAppId.toString() );
}
EntityManager targetEm = emf.getEntityManager( targetAppId );
- logger.debug( " importing into app {} file {}", targetAppId.toString(), file.getAbsolutePath() );
- importEntitiesFromFile( execution, file, targetEm, emManagementApp, fileImport );
+ // download file from S3, if no S3 importer was passed in then create one
+ S3Import s3Import;
+ Object s3PlaceHolder = jobExecution.getJobData().getProperty("s3Import");
+ try {
+ if (s3PlaceHolder != null) {
+ s3Import = (S3Import) s3PlaceHolder;
+ } else {
+ s3Import = new S3ImportImpl();
+ }
+ } catch (Exception e) {
+ logger.error("doImport(): Error creating S3Import", e);
+ fileImport.setErrorMessage(e.getMessage());
+ fileImport.setState( FileImport.State.FAILED );
+ emManagementApp.update(fileImport);
+ return;
+ }
+ File downloadedFile = s3Import.copyFileFromBucket(
+ fileName, bucketName, accessId, secretKey );
- // Updates the state of file import job
-
+ // parse JSON data, create Entities and Connections from import data
- // check other files status and mark the status of
- // import Job as Finished if all others are finished
- Results ImportJobResults =
- emManagementApp.getConnectingEntities( fileImport, "includes", null, Level.ALL_PROPERTIES );
+ parseEntitiesAndConnectionsFromJson(
+ jobExecution, downloadedFile, targetEm, emManagementApp, fileImport);
- List<Entity> importEntity = ImportJobResults.getEntities();
- UUID importId = importEntity.get( 0 ).getUuid();
- Import importUG = emManagementApp.get( importId, Import.class );
+ // mark ImportJob FINISHED but only if all other FileImportJobs are complete
- Results entities = emManagementApp.getConnectedEntities( importUG, "includes", null, Level.ALL_PROPERTIES );
+ // get parent import job of this file import job
+ Results importJobResults =
+ emManagementApp.getConnectingEntities( fileImport, "includes", null, Level.ALL_PROPERTIES );
+ List<Entity> importEntities = importJobResults.getEntities();
+ UUID importId = importEntities.get( 0 ).getUuid();
+ Import importEntity = emManagementApp.get( importId, Import.class );
+ // get all file import job siblings of the current job we're working now
+ Results entities = emManagementApp.getConnectedEntities( importEntity, "includes", null, Level.ALL_PROPERTIES );
PagingResultsIterator itr = new PagingResultsIterator( entities );
-
int failCount = 0;
-
while ( itr.hasNext() ) {
FileImport fi = ( FileImport ) itr.next();
-
-
switch ( fi.getState() ) {
- //failed, but we may not be complete so continue checking
- case FAILED:
+ case FAILED: // failed, but we may not be complete so continue checking
failCount++;
break;
- //finished, we can continue checking
- case FINISHED:
+ case FINISHED: // finished, we can continue checking
break;
- //not something we recognize as complete, short circuit
- default:
+ default: // not something we recognize as complete, short circuit
return;
}
}
-
if ( failCount == 0 ) {
- importUG.setState( Import.State.FINISHED );
- }
-
- //we had failures, set it to failed
- else {
- importUG.setState( Import.State.FAILED );
+ importEntity.setState(Import.State.FINISHED);
+ } else {
+ // we had failures, set it to failed
+ importEntity.setState(Import.State.FAILED);
}
- emManagementApp.update( importUG );
+ emManagementApp.update( importEntity );
}
-
-
/**
* Gets the JSON parser for given file
*
@@ -726,13 +593,13 @@ public class ImportServiceImpl implements ImportService {
/**
* Imports the entity's connecting references (collections, connections and dictionaries)
*
- * @param execution The job execution currently running
+ * @param execution The job jobExecution currently running
* @param file The file to be imported
* @param em Entity Manager for the application being imported
* @param rootEm Entity manager for the root applicaition
* @param fileImport The file import entity
*/
- private void importEntitiesFromFile(
+ private void parseEntitiesAndConnectionsFromJson(
final JobExecution execution,
final File file,
final EntityManager em,
@@ -781,21 +648,30 @@ public class ImportServiceImpl implements ImportService {
// potentially skip the first n if this is a resume operation
final int entityNumSkip = (int)tracker.getTotalEntityCount();
- final int entityCount = entityEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
- @Override
- public Boolean call( final WriteEvent writeEvent ) {
- return !tracker.shouldStopProcessingEntities();
- }
- } ).skip(entityNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
- @Override
- public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
- return entityWrapperObservable.doOnNext(doWork);
- }
- }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+ // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail
+// final int entityCount = entityEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
+// @Override
+// public Boolean call( final WriteEvent writeEvent ) {
+// return !tracker.shouldStopProcessingEntities();
+// }
+// } ).skip(entityNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+// @Override
+// public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+// return entityWrapperObservable.doOnNext(doWork);
+// }
+// }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+
+ entityEventObservable.parallel(
+ new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+ @Override
+ public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+ return entityWrapperObservable.doOnNext(doWork);
+ }
+ }, Schedulers.io()).toBlocking().last();
jp.close();
- logger.debug("\n\nimportEntitiesFromFile(): Wrote entities\n");
+ logger.debug("\n\nparseEntitiesAndConnectionsFromJson(): Wrote entities\n");
// SECOND PASS: import all connections and dictionaries
@@ -816,21 +692,31 @@ public class ImportServiceImpl implements ImportService {
// potentially skip the first n if this is a resume operation
final int connectionNumSkip = (int)tracker.getTotalConnectionCount();
- final int connectionCount = otherEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
- @Override
- public Boolean call( final WriteEvent writeEvent ) {
- return !tracker.shouldStopProcessingConnections();
- }
- } ).skip(connectionNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
- @Override
- public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
- return entityWrapperObservable.doOnNext(doWork);
- }
- }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+ // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail
+// final int connectionCount = otherEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
+// @Override
+// public Boolean call( final WriteEvent writeEvent ) {
+// return !tracker.shouldStopProcessingConnections();
+// }
+// } ).skip(connectionNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+// @Override
+// public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+// return entityWrapperObservable.doOnNext(doWork);
+// }
+// }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+
+ otherEventObservable.parallel(
+ new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+ @Override
+ public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+ return entityWrapperObservable.doOnNext(doWork);
+ }
+ }, Schedulers.io()).toBlocking().last();
jp.close();
- logger.debug("\n\nimportEntitiesFromFile(): Wrote others\n");
+ logger.debug("\n\nparseEntitiesAndConnectionsFromJson(): Wrote others for file {}\n",
+ fileImport.getFileName());
// flush the job statistics
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/18671e7a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java
index b0a0720..7e09051 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java
@@ -18,7 +18,9 @@
package org.apache.usergrid.management.importer;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
@@ -27,7 +29,9 @@ import java.util.Map;
*/
public interface S3Import {
- ArrayList<File> copyFromS3(
- Map<String, Object> exportInfo, String filename, ImportService.ImportType type);
+ List<String> getBucketFileNames(
+ String bucketName, String endsWith, String accessId, String secretKey ) throws Exception;
+ public File copyFileFromBucket(
+ String blobFileName, String bucketName, String accessId, String secretKey ) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/18671e7a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java
index 0fefeb0..f10eea9 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java
@@ -20,6 +20,8 @@ package org.apache.usergrid.management.importer;
import com.amazonaws.SDKGlobalConfiguration;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Module;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.lucene.document.StringField;
import org.jclouds.ContextBuilder;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
@@ -35,6 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.*;
@@ -44,29 +47,10 @@ public class S3ImportImpl implements S3Import {
private static final Logger logger = LoggerFactory.getLogger(S3ImportImpl.class);
- /**
- * Downloads the files from s3 into temp local files.
- *
- * @param importInfo the information entered by the user required to perform import from S3
- * @param filenamePrefix generated based on the request URI
- * @param type indicates the type of import
- * @return An ArrayList of files i.e. the files downloaded from s3
- */
- public ArrayList<File> copyFromS3(
- final Map<String, Object> importInfo, String filenamePrefix, ImportService.ImportType type) {
+ public File copyFileFromBucket(
+ String blobFileName, String bucketName, String accessId, String secretKey ) throws Exception {
- logger.debug("copyFileFromS3(): copying file={} type={}", filenamePrefix, type.toString());
-
- ArrayList<File> files = new ArrayList<>();
-
- Map<String, Object> properties = (Map<String, Object>) importInfo.get("properties");
-
- Map<String, Object> storage_info = (Map<String, Object>) properties.get("storage_info");
-
- String bucketName = (String) storage_info.get("bucket_location");
- //TODO: have this support the alternate configurations as well
- String accessId = (String) storage_info.get( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR);
- String secretKey = (String) storage_info.get(SDKGlobalConfiguration.SECRET_KEY_ENV_VAR);
+ // setup to use JCloud BlobStore interface to AWS S3
Properties overrides = new Properties();
overrides.setProperty("s3" + ".identity", accessId);
@@ -82,182 +66,64 @@ public class S3ImportImpl implements S3Import {
.modules(MODULES)
.overrides(overrides)
.buildView(BlobStoreContext.class);
+ BlobStore blobStore = context.getBlobStore();
- try {
-
- BlobStore blobStore = context.getBlobStore();
-
- // gets all the files in the bucket recursively
- PageSet<? extends StorageMetadata> pageSet =
- blobStore.list(bucketName, new ListContainerOptions().recursive());
-
- logger.debug(" Found {} files in bucket {}", pageSet.size(), bucketName);
-
- Iterator itr = pageSet.iterator();
-
- while (itr.hasNext()) {
-
- String blobStoreFileName = ((MutableBlobMetadata) itr.next()).getName();
- ParsedFileName pfn = new ParsedFileName(blobStoreFileName);
-
- switch (type) {
-
- // collection file in format <org_name>/<app_name>.<collection_name>.[0-9]+.json
- case COLLECTION: {
- List<String> errors = new ArrayList<>();
- if (pfn.organizationName == null) {
- errors.add("Filename does not specify organization name");
- }
- if (pfn.applicationName == null) {
- errors.add("Filename does not specify application name");
- }
- if (pfn.collectionName == null) {
- errors.add("Filename does not specify collection name");
-
- // we shouldn't care what collection name is specified in the import file.
-// } else if (!pfn.collectionName.equals(importInfo.get("collectionName"))) {
-// errors.add("Collection name in input file should be " + pfn.collectionName);
- }
- if (!errors.isEmpty()) {
- throw new IllegalArgumentException("Input errors " + errors.toString());
- }
- files.add(copyFile(blobStore, bucketName, blobStoreFileName));
- break;
- }
-
- // application file in format <org_name>/<app_name>.[0-9]+.json
- case APPLICATION: {
- List<String> errors = new ArrayList<>();
- if (pfn.organizationName == null) {
- errors.add("Filename does not specify organization name");
- }
- if (pfn.applicationName == null) {
- errors.add("Filename does not specify application name");
- }
- if (!errors.isEmpty()) {
- throw new IllegalArgumentException("Input errors " + errors.toString());
- }
-
- files.add(copyFile(blobStore, bucketName, blobStoreFileName));
- break;
- }
-
- // is an application file in format <org_name>/[-a-zA-Z0-9]+.[0-9]+.json
- case ORGANIZATION: {
- List<String> errors = new ArrayList<>();
- if (pfn.organizationName == null) {
- errors.add("Filename does not specify organization name");
- }
- if (!errors.isEmpty()) {
- throw new IllegalArgumentException("Input errors " + errors.toString());
- }
- files.add(copyFile(blobStore, bucketName, blobStoreFileName));
- break;
- }
-
- default: {
- throw new IllegalArgumentException(
- "Unrecognized import type " + type.toString());
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- logger.debug(" Returning {} files", files.size());
- return files;
- }
-
-
- /**
- * Copy the file from s3 into a temp local file.
- *
- * @param bucketName the S3 bucket name from where files need to be imported
- * @param fileName the filename by which the temp file should be created
- */
- private File copyFile(BlobStore blobStore, String bucketName, String fileName) throws IOException {
-
- Blob blob = blobStore.getBlob(bucketName, fileName);
+ // get file from configured bucket, copy it to local temp file
- String[] fileOrg = fileName.split("/");
- File organizationDirectory = new File(fileOrg[0]);
-
- if (!organizationDirectory.exists()) {
- try {
- organizationDirectory.mkdir();
-
- } catch (SecurityException se) {
- logger.error(se.getMessage());
- }
+ Blob blob = blobStore.getBlob(bucketName, blobFileName);
+ if ( blob == null) {
+ throw new RuntimeException(
+ "Blob file name " + blobFileName + " not found in bucket " + bucketName );
}
-
- File ephemeral = new File(fileName);
- FileOutputStream fop = new FileOutputStream(ephemeral);
+ File tempFile = File.createTempFile( bucketName, RandomStringUtils.randomAlphabetic(10));
+ FileOutputStream fop = new FileOutputStream(tempFile);
blob.getPayload().writeTo(fop);
fop.close();
+ tempFile.deleteOnExit();
- organizationDirectory.deleteOnExit();
- ephemeral.deleteOnExit();
-
- return ephemeral;
+ return tempFile;
}
- /**
- * Break filename down into parts.
- */
- class ParsedFileName {
- String fileName;
- String applicationName;
- String collectionName;
- String organizationName;
- long fileNumber = -1L;
-
- public ParsedFileName(String fileName) {
+ @Override
+ public List<String> getBucketFileNames(
+ String bucketName, String endsWith, String accessId, String secretKey ) {
- this.fileName = fileName;
+ // get setup to use JCloud BlobStore interface to AWS S3
- if (fileName.endsWith("\\.json")) {
- logger.debug("Bad filename " + fileName);
- throw new IllegalArgumentException("Import filenames must end with .json");
- }
+ Properties overrides = new Properties();
+ overrides.setProperty("s3" + ".identity", accessId);
+ overrides.setProperty("s3" + ".credential", secretKey);
- if (fileName.contains("/")) {
- String[] parts = fileName.split("/");
- organizationName = parts[0];
+ final Iterable<? extends Module> MODULES = ImmutableSet.of(
+ new JavaUrlHttpCommandExecutorServiceModule(),
+ new Log4JLoggingModule(),
+ new NettyPayloadModule());
- if (parts.length > 1) {
- String[] secondParts = parts[1].split("\\.");
- applicationName = secondParts[0];
+ BlobStoreContext context = ContextBuilder.newBuilder("s3")
+ .credentials(accessId, secretKey)
+ .modules(MODULES)
+ .overrides(overrides)
+ .buildView(BlobStoreContext.class);
+ BlobStore blobStore = context.getBlobStore();
- if (secondParts.length > 1) {
- collectionName = secondParts[1];
- }
+ // gets all the files in the configured bucket recursively
- if (secondParts.length > 2) {
- fileNumber = Long.parseLong(secondParts[2]);
- }
- }
- }
+ PageSet<? extends StorageMetadata> pageSets =
+ blobStore.list(bucketName, new ListContainerOptions().recursive());
+ logger.debug(" Found {} files in bucket {}", pageSets.size(), bucketName);
- if (applicationName == null
- && collectionName == null
- && organizationName == null) {
- throw new IllegalArgumentException("Unable to parse import filename " + fileName);
+ List<String> blobFileNames = new ArrayList<>();
+ for ( Object pageSet : pageSets ) {
+ String blobFileName = ((MutableBlobMetadata)pageSet).getName();
+ if ( blobFileName.endsWith( endsWith )) {
+ blobFileNames.add(blobFileName);
}
-
- logger.debug("Parsed " + toString());
}
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("org: ").append(organizationName);
- sb.append(" app: ").append(applicationName);
- sb.append(" col: ").append(collectionName);
- sb.append(" num: ").append(fileNumber);
- return sb.toString();
- }
+ return blobFileNames;
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/18671e7a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
index 0757863..9223ac2 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
@@ -75,7 +75,7 @@ public class ImportQueueListener extends QueueListener {
for (QueueMessage message : messages) {
ImportQueueMessage queueMessage = ( ImportQueueMessage ) message.getBody();
-// TODO We still need to hide this queue behind the scheduler importService.parseFileToEntities( queueMessage );
+// TODO We still need to hide this queue behind the scheduler importService.downloadAndImportFile( queueMessage );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/18671e7a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
index ef335c8..9a8c977 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
@@ -401,7 +401,6 @@ public class ImportCollectionIT {
logger.debug("\n\nImport into new app {}\n", em.getApplication().getName() );
-
ImportService importService = setup.getImportService();
UUID importUUID = importService.schedule( new HashMap<String, Object>() {{
put( "path", organization.getName() + em.getApplication().getName());
@@ -420,15 +419,13 @@ public class ImportCollectionIT {
}});
}});
- // listener.start();
-
-// int maxRetries = 20;
-// int retries = 0;
- while ( !importService.getState( importUUID ).equals( "FINISHED" ) ) {
+ int maxRetries = 20;
+ int retries = 0;
+ while ( !importService.getState( importUUID ).equals( "FINISHED" ) && retries++ < maxRetries ) {
+ logger.debug("Waiting for import...");
Thread.sleep(1000);
}
-
em.refreshIndex();
}
@@ -465,6 +462,7 @@ public class ImportCollectionIT {
int maxRetries = 20;
int retries = 0;
while ( !exportService.getState( exportUUID ).equals( "FINISHED" ) && retries++ < maxRetries ) {
+ logger.debug("Waiting for export...");
Thread.sleep(1000);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/18671e7a/stack/services/src/test/java/org/apache/usergrid/management/importer/MockS3ImportImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/MockS3ImportImpl.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/MockS3ImportImpl.java
index caf7fd2..16096b6 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/MockS3ImportImpl.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/MockS3ImportImpl.java
@@ -21,18 +21,22 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
-import java.util.Map;
+import java.util.List;
public class MockS3ImportImpl implements S3Import{
private static final Logger logger = LoggerFactory.getLogger(MockS3ImportImpl.class);
@Override
- public ArrayList<File> copyFromS3(
- final Map<String,Object> exportInfo, String filename , ImportService.ImportType type ) {
- logger.info("Copying from S3 file {} with import type {}", filename, type.toString());
+ public List<String> getBucketFileNames(String bucketName, String endsWith, String accessId, String secretKey) {
return new ArrayList<>();
}
+ @Override
+ public File copyFileFromBucket(String blobFileName, String bucketName, String accessId, String secretKey) throws IOException {
+ return File.createTempFile("test","tmp");
+ }
+
}