You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/15 16:01:38 UTC

[1/2] Completing rename of "importUG" package to "importer" to avoid capital letters in package name.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-import 52dbfe43a -> 5d6c4accc


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
new file mode 100644
index 0000000..400439a
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -0,0 +1,977 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.management.importer;
+
+import org.apache.usergrid.batch.JobExecution;
+import org.apache.usergrid.batch.service.SchedulerService;
+import org.apache.usergrid.management.ApplicationInfo;
+import org.apache.usergrid.management.ManagementService;
+import org.apache.usergrid.management.OrganizationInfo;
+import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.entities.FileImport;
+import org.apache.usergrid.persistence.entities.Import;
+import org.apache.usergrid.persistence.entities.JobData;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Subscriber;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.usergrid.corepersistence.CpEntityManagerFactory.MANAGEMENT_APPLICATION_ID;
+import org.apache.usergrid.persistence.index.query.Query.Level;
+
+
+public class ImportServiceImpl implements ImportService {
+
+    public static final String IMPORT_ID = "importId";
+    public static final String IMPORT_JOB_NAME = "importJob";
+    public static final String FILE_IMPORT_ID = "fileImportId";
+    public static final String FILE_IMPORT_JOB_NAME = "fileImportJob";
+
+    //Amount of time that has passed before sending another heart beat in millis
+    public static final int TIMESTAMP_DELTA = 5000;
+
+    private static final Logger logger = LoggerFactory.getLogger(ImportServiceImpl.class);
+
+    //injected the Entity Manager Factory
+    protected EntityManagerFactory emf;
+    private ArrayList<File> files;
+
+    //dependency injection
+    private SchedulerService sch;
+
+    //inject Management Service to access Organization Data
+    private ManagementService managementService;
+    private JsonFactory jsonFactory = new JsonFactory();
+
+    private int entityCount = 0;
+
+    /**
+     * This schedules the main import Job
+     *
+     * @param config configuration of the job to be scheduled
+     * @return it returns the UUID of the scheduled job
+     * @throws Exception
+     */
+    @Override
+    public UUID schedule(Map<String, Object> config) throws Exception {
+
+        if (config == null) {
+            logger.error("import information cannot be null");
+            return null;
+        }
+
+        EntityManager rootEm = null;
+        try {
+            rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+            Set<String> collections = rootEm.getApplicationCollections();
+            if (!collections.contains("imports")) {
+                rootEm.createApplicationCollection("imports");
+            }
+        } catch (Exception e) {
+            logger.error("application doesn't exist within the current context");
+            return null;
+        }
+
+        Import importUG = new Import();
+
+        // create the import entity to store all metadata about the import job
+        try {
+            importUG = rootEm.create(importUG);
+        } catch (Exception e) {
+            logger.error("Import entity creation failed");
+            return null;
+        }
+
+        // update state for import job to created
+        importUG.setState(Import.State.CREATED);
+        rootEm.update(importUG);
+
+        // set data to be transferred to importInfo
+        JobData jobData = new JobData();
+        jobData.setProperty("importInfo", config);
+        jobData.setProperty(IMPORT_ID, importUG.getUuid());
+
+        long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
+
+        // schedule import job
+        sch.createJob(IMPORT_JOB_NAME, soonestPossible, jobData);
+
+        // update state for import job to created
+        importUG.setState(Import.State.SCHEDULED);
+        rootEm.update(importUG);
+
+        return importUG.getUuid();
+    }
+
+    /**
+     * This schedules the sub  FileImport Job
+     *
+     * @param file file to be scheduled
+     * @return it returns the UUID of the scheduled job
+     * @throws Exception
+     */
+    public UUID scheduleFile(String file, EntityRef importRef) throws Exception {
+
+        EntityManager rootEm = null;
+
+        try {
+            rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+        } catch (Exception e) {
+            logger.error("application doesn't exist within the current context");
+            return null;
+        }
+
+        // create a FileImport entity to store metadata about the fileImport job
+        FileImport fileImport = new FileImport();
+
+        fileImport.setFileName(file);
+        fileImport.setCompleted(false);
+        fileImport.setLastUpdatedUUID(" ");
+        fileImport.setErrorMessage(" ");
+        fileImport.setState(FileImport.State.CREATED);
+        fileImport = rootEm.create(fileImport);
+
+        Import importUG = rootEm.get(importRef, Import.class);
+
+        try {
+            // create a connection between the main import job and the sub FileImport Job
+            rootEm.createConnection(importUG, "includes", fileImport);
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+            return null;
+        }
+
+        // mark the File Import Job as created
+        fileImport.setState(FileImport.State.CREATED);
+        rootEm.update(fileImport);
+
+        //set data to be transferred to the FileImport Job
+        JobData jobData = new JobData();
+        jobData.setProperty("File", file);
+        jobData.setProperty(FILE_IMPORT_ID, fileImport.getUuid());
+
+        long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
+
+        //schedule file import job
+        sch.createJob(FILE_IMPORT_JOB_NAME, soonestPossible, jobData);
+
+        //update state of the job to Scheduled
+        fileImport.setState(FileImport.State.SCHEDULED);
+        rootEm.update(fileImport);
+
+        return fileImport.getUuid();
+    }
+
+    /**
+     * Query Entity Manager for the state of the Import Entity. This corresponds to the GET /import
+     *
+     * @return String
+     */
+    @Override
+    public String getState(UUID uuid) throws Exception {
+        if (uuid == null) {
+            logger.error("UUID passed in cannot be null.");
+            return "UUID passed in cannot be null";
+        }
+
+        EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+
+        //retrieve the import entity.
+        Import importUG = rootEm.get(uuid, Import.class);
+
+        if (importUG == null) {
+            logger.error("no entity with that uuid was found");
+            return "No Such Element found";
+        }
+        return importUG.getState().toString();
+    }
+
+    /**
+     * Query Entity Manager for the error message generated for an import job.
+     *
+     * @return String
+     */
+    @Override
+    public String getErrorMessage(final UUID uuid) throws Exception {
+
+        //get application entity manager
+
+        if (uuid == null) {
+            logger.error("UUID passed in cannot be null.");
+            return "UUID passed in cannot be null";
+        }
+
+        EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+
+        //retrieve the import entity.
+        Import importUG = rootEm.get(uuid, Import.class);
+
+        if (importUG == null) {
+            logger.error("no entity with that uuid was found");
+            return "No Such Element found";
+        }
+        return importUG.getErrorMessage().toString();
+    }
+
+    /**
+     * Returns the Import Entity that stores all meta-data for the particular import Job
+     * @param jobExecution the import job details
+     * @return Import Entity
+     * @throws Exception
+     */
+    @Override
+    public Import getImportEntity(final JobExecution jobExecution) throws Exception {
+
+        UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
+        EntityManager importManager = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+
+        return importManager.get(importId, Import.class);
+    }
+
+
+    /**
+     * Returns the File Import Entity that stores all meta-data for the particular sub File import Job
+     * @param jobExecution the file import job details
+     * @return File Import Entity
+     * @throws Exception
+     */
+    @Override
+    public FileImport getFileImportEntity(final JobExecution jobExecution) throws Exception {
+
+        UUID fileImportId = (UUID) jobExecution.getJobData().getProperty(FILE_IMPORT_ID);
+        EntityManager em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+
+        return em.get(fileImportId, FileImport.class);
+    }
+
+    /**
+     * This returns the temporary files downloaded form s3
+     */
+    @Override
+    public ArrayList<File> getEphemeralFile() {
+        return files;
+    }
+
+    public SchedulerService getSch() {
+        return sch;
+    }
+
+
+    public void setSch(final SchedulerService sch) {
+        this.sch = sch;
+    }
+
+
+    public EntityManagerFactory getEmf() {
+        return emf;
+    }
+
+
+    public void setEmf(final EntityManagerFactory emf) {
+        this.emf = emf;
+    }
+
+
+    public ManagementService getManagementService() {
+
+        return managementService;
+    }
+
+
+    public void setManagementService(final ManagementService managementService) {
+        this.managementService = managementService;
+    }
+
+    /**
+     * This method gets the files from s3 and also creates sub-jobs for each file i.e. File Import Jobs
+     * @param jobExecution the job created by the scheduler with all the required config data
+     * @throws Exception
+     */
+    @Override
+    public void doImport(JobExecution jobExecution) throws Exception {
+
+        Map<String, Object> config = (Map<String, Object>) jobExecution.getJobData().getProperty("importInfo");
+        Object s3PlaceHolder = jobExecution.getJobData().getProperty("s3Import");
+        S3Import s3Import = null;
+
+        if (config == null) {
+            logger.error("Import Information passed through is null");
+            return;
+        }
+
+        //get the entity manager for the application, and the entity that this Import corresponds to.
+        UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
+
+        EntityManager rooteEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+        Import importUG = rooteEm.get(importId, Import.class);
+
+        //update the entity state to show that the job has officially started.
+        importUG.setState(Import.State.STARTED);
+        importUG.setStarted(System.currentTimeMillis());
+        importUG.setErrorMessage(" ");
+        rooteEm.update(importUG);
+        try {
+            if (s3PlaceHolder != null) {
+                s3Import = (S3Import) s3PlaceHolder;
+            } else {
+                s3Import = new S3ImportImpl();
+            }
+        } catch (Exception e) {
+            logger.error("S3Import doesn't exist");
+            importUG.setErrorMessage(e.getMessage());
+            importUG.setState(Import.State.FAILED);
+            rooteEm.update(importUG);
+            return;
+        }
+
+        try {
+
+            if (config.get("organizationId") == null) {
+                logger.error("No organization could be found");
+                importUG.setErrorMessage("No organization could be found");
+                importUG.setState(Import.State.FAILED);
+                rooteEm.update(importUG);
+                return;
+            } else if (config.get("applicationId") == null) {
+                //import All the applications from an organization
+                importApplicationsFromOrg((UUID) config.get("organizationId"), config, jobExecution, s3Import);
+            } else if (config.get("collectionName") == null) {
+                //imports an Application from a single organization
+                importApplicationFromOrg((UUID) config.get("organizationId"), (UUID) config.get("applicationId"), config, jobExecution, s3Import);
+            } else {
+                //imports a single collection from an app org combo
+                importCollectionFromOrgApp((UUID) config.get("applicationId"), config, jobExecution, s3Import);
+            }
+
+        } catch (OrganizationNotFoundException e) {
+            importUG.setErrorMessage(e.getMessage());
+            importUG.setState(Import.State.FINISHED);
+            rooteEm.update(importUG);
+            return;
+        } catch (ApplicationNotFoundException e) {
+            importUG.setErrorMessage(e.getMessage());
+            importUG.setState(Import.State.FINISHED);
+            rooteEm.update(importUG);
+            return;
+        }
+
+        if (files.size() == 0) {
+
+            importUG.setState(Import.State.FINISHED);
+            importUG.setErrorMessage("no files found in the bucket with the relevant context");
+            rooteEm.update(importUG);
+
+        } else {
+
+            Map<String, Object> fileMetadata = new HashMap<String, Object>();
+
+            ArrayList<Map<String, Object>> value = new ArrayList<Map<String, Object>>();
+
+            // schedule each file as a separate job
+            for (File eachfile : files) {
+
+                UUID jobID = scheduleFile(eachfile.getPath(), importUG);
+                Map<String, Object> fileJobID = new HashMap<String, Object>();
+                fileJobID.put("FileName", eachfile.getName());
+                fileJobID.put("JobID", jobID.toString());
+                value.add(fileJobID);
+            }
+
+            fileMetadata.put("files", value);
+            importUG.addProperties(fileMetadata);
+            rooteEm.update(importUG);
+        }
+        return;
+    }
+
+    /**
+     * Imports a specific collection from an org-app combo.
+     */
+    private void importCollectionFromOrgApp(UUID applicationUUID, final Map<String, Object> config,
+                                            final JobExecution jobExecution, S3Import s3Import) throws Exception {
+
+        //retrieves import entity
+        Import importUG = getImportEntity(jobExecution);
+        ApplicationInfo application = managementService.getApplicationInfo(applicationUUID);
+
+        if (application == null) {
+            throw new ApplicationNotFoundException("Application Not Found");
+        }
+
+        String collectionName = config.get("collectionName").toString();
+
+        // prepares the prefix path for the files to be imported depending on the endpoint being hit
+        String appFileName = prepareInputFileName("application", application.getName(), collectionName);
+
+        files = fileTransfer(importUG, appFileName, config, s3Import, 0);
+
+    }
+
+    /**
+     * Imports a specific applications from an organization
+     */
+    private void importApplicationFromOrg(UUID organizationUUID, UUID applicationId, final Map<String, Object> config,
+                                          final JobExecution jobExecution, S3Import s3Import) throws Exception {
+
+        //retrieves import entity
+        Import importUG = getImportEntity(jobExecution);
+
+        ApplicationInfo application = managementService.getApplicationInfo(applicationId);
+
+        if (application == null) {
+            throw new ApplicationNotFoundException("Application Not Found");
+        }
+
+        // prepares the prefix path for the files to be imported depending on the endpoint being hit
+        String appFileName = prepareInputFileName("application", application.getName(), null);
+
+        files = fileTransfer(importUG, appFileName, config, s3Import, 1);
+
+    }
+
+    /**
+     * Imports All Applications from an Organization
+     */
+    private void importApplicationsFromOrg(UUID organizationUUID, final Map<String, Object> config,
+                                           final JobExecution jobExecution, S3Import s3Import) throws Exception {
+
+        // retrieves import entity
+        Import importUG = getImportEntity(jobExecution);
+        String appFileName = null;
+
+        OrganizationInfo organizationInfo = managementService.getOrganizationByUuid(organizationUUID);
+        if (organizationInfo == null) {
+            throw new OrganizationNotFoundException("Organization Not Found");
+        }
+
+        // prepares the prefix path for the files to be imported depending on the endpoint being hit
+        appFileName = prepareInputFileName("organization", organizationInfo.getName(), null);
+
+        files = fileTransfer(importUG, appFileName, config, s3Import, 2);
+
+    }
+
+    /**
+     * prepares the prefix path for the files to be imported depending on the endpoint being hit
+     * @param type just a label such us: organization, application.
+     * @return the file name concatenated with the type and the name of the collection
+     */
+    protected String prepareInputFileName(String type, String name, String CollectionName) {
+        StringBuilder str = new StringBuilder();
+
+        // in case of type organization --> the file name will be "<org_name>/"
+        if (type.equals("organization")) {
+
+            str.append(name);
+            str.append("/");
+
+        } else if (type.equals("application")) {
+
+            // in case of type application --> the file name will be "<org_name>/<app_name>."
+            str.append(name);
+            str.append(".");
+
+            if (CollectionName != null) {
+
+                // in case of type application and collection import --> the file name will be "<org_name>/<app_name>.<collection_name>."
+                str.append(CollectionName);
+                str.append(".");
+
+            }
+        }
+
+        String inputFileName = str.toString();
+
+        return inputFileName;
+    }
+
+    /**
+     * @param importUG    Import instance
+     * @param appFileName the base file name for the files to be downloaded
+     * @param config      the config information for the import job
+     * @param s3Import    s3import instance
+     * @param type        it indicates the type of import. 0 - Collection , 1 - Application and 2 - Organization
+     * @return
+     */
+    public ArrayList<File> fileTransfer(Import importUG, String appFileName, Map<String, Object> config,
+                                        S3Import s3Import, int type) throws Exception {
+
+        EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+        ArrayList<File> files = new ArrayList<File>();
+
+        try {
+            files = s3Import.copyFromS3(config, appFileName, type);
+        } catch (Exception e) {
+            importUG.setErrorMessage(e.getMessage());
+            importUG.setState(Import.State.FAILED);
+            rootEm.update(importUG);
+        }
+        return files;
+    }
+
+    /**
+     * The loops through each temp file and parses it to store the entities from the json back into usergrid
+     *
+     * @throws Exception
+     */
+    @Override
+    public void FileParser(JobExecution jobExecution) throws Exception {
+
+        // add properties to the import entity
+        FileImport fileImport = getFileImportEntity(jobExecution);
+
+        File file = new File(jobExecution.getJobData().getProperty("File").toString());
+
+        EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+        rootEm.update(fileImport);
+
+        boolean completed = fileImport.getCompleted();
+
+        // on resume, completed files will not be traversed again
+        if (!completed) {
+
+            // validates the JSON structure
+            if (isValidJSON(file, rootEm, fileImport)) {
+
+                // mark the File import job as started
+                fileImport.setState(FileImport.State.STARTED);
+                rootEm.update(fileImport);
+
+                // gets the application anme from the filename
+                String applicationName = file.getPath().split("\\.")[0];
+
+                ApplicationInfo application = managementService.getApplicationInfo(applicationName);
+
+                JsonParser jp = getJsonParserForFile(file);
+
+                // incase of resume, retrieve the last updated UUID for this file
+                String lastUpdatedUUID = fileImport.getLastUpdatedUUID();
+
+                // this handles partially completed files by updating entities from the point of failure
+                if (!lastUpdatedUUID.equals(" ")) {
+
+                    // go till the last updated entity
+                    while (!jp.getText().equals(lastUpdatedUUID)) {
+                        jp.nextToken();
+                    }
+
+                    // skip the last one and start from the next one
+                    while (!(jp.getCurrentToken() == JsonToken.END_OBJECT && jp.nextToken() == JsonToken.START_OBJECT)) {
+                        jp.nextToken();
+                    }
+                }
+
+                // get to start of an object i.e next entity.
+                while (jp.getCurrentToken() != JsonToken.START_OBJECT) {
+                    jp.nextToken();
+                }
+
+                // get entity manager for the application
+                EntityManager em = emf.getEntityManager(application.getId());
+
+                while (jp.nextToken() != JsonToken.END_ARRAY) {
+                    // import the entities in this file
+                    importEntityStuff(jp, em, rootEm, fileImport, jobExecution);
+                }
+                jp.close();
+
+                // Updates the state of file import job
+                if (!fileImport.getState().equals("FAILED")) {
+
+                    // mark file as completed
+                    fileImport.setCompleted(true);
+                    fileImport.setState(FileImport.State.FINISHED);
+                    rootEm.update(fileImport);
+
+                    //check other files status and mark the status of import Job as Finished if all others are finished
+                    Results ImportJobResults = rootEm.getConnectingEntities(fileImport, "includes", null, Level.ALL_PROPERTIES);
+                    List<Entity> importEntity = ImportJobResults.getEntities();
+                    UUID importId = importEntity.get(0).getUuid();
+                    Import importUG = rootEm.get(importId, Import.class);
+
+                    Results entities = rootEm.getConnectedEntities( importUG, "includes", null, Level.ALL_PROPERTIES);
+                    List<Entity> importFile = entities.getEntities();
+
+                    int count = 0;
+                    for (Entity eachEntity : importFile) {
+                        FileImport fi = rootEm.get(eachEntity.getUuid(), FileImport.class);
+                        if (fi.getState().toString().equals("FINISHED")) {
+                            count++;
+                        } else if (fi.getState().toString().equals("FAILED")) {
+                            importUG.setState(Import.State.FAILED);
+                            rootEm.update(importUG);
+                            break;
+                        }
+                    }
+                    if (count == importFile.size()) {
+                        importUG.setState(Import.State.FINISHED);
+                        rootEm.update(importUG);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Checks if a file is a valid JSON
+     * @param collectionFile the file being validated
+     * @param rootEm    the Entity Manager for the Management application
+     * @param fileImport the file import entity
+     * @return
+     * @throws Exception
+     */
+    private boolean isValidJSON(File collectionFile, EntityManager rootEm, FileImport fileImport) throws Exception {
+
+        boolean valid = false;
+        try {
+            final JsonParser jp = jsonFactory.createJsonParser(collectionFile);
+            while (jp.nextToken() != null) {
+            }
+            valid = true;
+        } catch (JsonParseException e) {
+            e.printStackTrace();
+            fileImport.setErrorMessage(e.getMessage());
+            rootEm.update(fileImport);
+        } catch (IOException e) {
+            fileImport.setErrorMessage(e.getMessage());
+            rootEm.update(fileImport);
+        }
+        return valid;
+    }
+
+
+    /**
+     * Gets the JSON parser for given file
+     * @param collectionFile the file for which JSON parser is required
+     * @return
+     * @throws Exception
+     */
+    private JsonParser getJsonParserForFile(File collectionFile) throws Exception {
+        JsonParser jp = jsonFactory.createJsonParser(collectionFile);
+        jp.setCodec(new ObjectMapper());
+        return jp;
+    }
+
+
+
+    /**
+     * Imports the entity's connecting references (collections, connections and dictionaries)
+     * @param jp  JsonParser pointing to the beginning of the object.
+     * @param em Entity Manager for the application being imported
+     * @param rootEm Entity manager for the root applicaition
+     * @param fileImport the file import entity
+     * @param jobExecution  execution details for the import jbo
+     * @throws Exception
+     */
+    private void importEntityStuff(final JsonParser jp, final EntityManager em, final EntityManager rootEm, final FileImport fileImport, final JobExecution jobExecution) throws Exception {
+
+        final JsonParserObservable subscribe = new JsonParserObservable(jp, em, rootEm, fileImport);
+
+        final Observable<WriteEvent> observable = Observable.create(subscribe);
+
+        /**
+         * This is the action we want to perform for every UUID we receive
+         */
+        final Action1<WriteEvent> doWork = new Action1<WriteEvent>() {
+            @Override
+            public void call(WriteEvent writeEvent) {
+                writeEvent.doWrite(em, jobExecution, fileImport);
+            }
+
+        };
+
+
+        final AtomicLong entityCounter = new AtomicLong();
+        final AtomicLong eventCounter = new AtomicLong();
+        /**
+         * This is boilerplate glue code.  We have to follow this for the parallel operation.  In the "call"
+         * method we want to simply return the input observable + the chain of operations we want to invoke
+         */
+        observable.parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+            @Override
+            public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+
+                /* TODO:
+                 * need to fixed so that number of entities created can be counted correctly
+                 * and also update the last updated UUID for the fileImport which is a must for resumability
+                 */
+//                return entityWrapperObservable.doOnNext(doWork).doOnNext(new Action1<WriteEvent>() {
+//
+//                         @Override
+//                         public void call(WriteEvent writeEvent) {
+//                             if (!(writeEvent instanceof EntityEvent)) {
+//                                 final long val = eventCounter.incrementAndGet();
+//                                 if(val % 50 == 0) {
+//                                     jobExecution.heartbeat();
+//                                 }
+//                                 return;
+//                             }
+//
+//                             final long value = entityCounter.incrementAndGet();
+//                             if (value % 2000 == 0) {
+//                                 try {
+//                                     logger.error("UUID = " +((EntityEvent) writeEvent).getEntityUuid().toString() + " value = " + value +"");
+//                                     fileImport.setLastUpdatedUUID(((EntityEvent) writeEvent).getEntityUuid().toString());
+//                                     //checkpoint the UUID here.
+//                                     rootEm.update(fileImport);
+//                                 } catch(Exception ex) {}
+//                             }
+//                             if(value % 100 == 0) {
+//                                 logger.error("heartbeat sent by " + fileImport.getFileName());
+//                                 jobExecution.heartbeat();
+//                             }
+//                         }
+//                     }
+//                );
+
+                return entityWrapperObservable.doOnNext(doWork);
+            }
+        }, Schedulers.io()).toBlocking().last();
+    }
+
+    private interface WriteEvent {
+        public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport);
+    }
+
+    private final class EntityEvent implements WriteEvent {
+        UUID entityUuid;
+        String entityType;
+        Map<String, Object> properties;
+
+        EntityEvent(UUID entityUuid, String entityType, Map<String, Object> properties) {
+            this.entityUuid = entityUuid;
+            this.entityType = entityType;
+            this.properties = properties;
+        }
+
+        public UUID getEntityUuid() {
+            return entityUuid;
+        }
+
+        // Creates entities
+        @Override
+        public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
+            EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+
+            try {
+                em.create(entityUuid, entityType, properties);
+            } catch (Exception e) {
+                fileImport.setErrorMessage(e.getMessage());
+                try {
+                    rootEm.update(fileImport);
+                } catch (Exception ex) {
+                }
+            }
+        }
+    }
+
+    private final class ConnectionEvent implements WriteEvent {
+        EntityRef ownerEntityRef;
+        String connectionType;
+        EntityRef entryRef;
+
+        ConnectionEvent(EntityRef ownerEntityRef, String connectionType, EntityRef entryRef) {
+            this.ownerEntityRef = ownerEntityRef;
+            this.connectionType = connectionType;
+            this.entryRef = entryRef;
+
+        }
+
+        // creates connections between entities
+        @Override
+        public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
+            EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+
+            try {
+                em.createConnection(ownerEntityRef, connectionType, entryRef);
+            } catch (Exception e) {
+                fileImport.setErrorMessage(e.getMessage());
+                try {
+                    rootEm.update(fileImport);
+                } catch (Exception ex) {
+                }
+            }
+        }
+    }
+
+    private final class DictionaryEvent implements WriteEvent {
+
+        EntityRef ownerEntityRef;
+        String dictionaryName;
+        Map<String, Object> dictionary;
+
+        DictionaryEvent(EntityRef ownerEntityRef, String dictionaryName, Map<String, Object> dictionary) {
+            this.ownerEntityRef = ownerEntityRef;
+            this.dictionaryName = dictionaryName;
+            this.dictionary = dictionary;
+        }
+
+        // adds map to the dictionary
+        @Override
+        public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
+            EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+            try {
+                em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
+            } catch (Exception e) {
+                fileImport.setErrorMessage(e.getMessage());
+                try {
+                    rootEm.update(fileImport);
+                } catch (Exception ex) {
+                }
+            }
+        }
+    }
+
+
+    private final class JsonParserObservable implements Observable.OnSubscribe<WriteEvent> {
+        private final JsonParser jp;
+        EntityManager em;
+        EntityManager rootEm;
+        FileImport fileImport;
+
+
+        JsonParserObservable(JsonParser parser, EntityManager em, EntityManager rootEm, FileImport fileImport) {
+            this.jp = parser;
+            this.em = em;
+            this.rootEm = rootEm;
+            this.fileImport = fileImport;
+        }
+
+        @Override
+        public void call(final Subscriber<? super WriteEvent> subscriber) {
+
+            WriteEvent entityWrapper = null;
+            EntityRef ownerEntityRef = null;
+            String entityUuid = "";
+            String entityType = "";
+            try {
+                while (!subscriber.isUnsubscribed() && jp.nextToken() != JsonToken.END_OBJECT) {
+                    String collectionName = jp.getCurrentName();
+
+                    // create the  wrapper for connections
+                    if (collectionName.equals("connections")) {
+
+                        jp.nextToken(); // START_OBJECT
+                        while (jp.nextToken() != JsonToken.END_OBJECT) {
+                            String connectionType = jp.getCurrentName();
+
+                            jp.nextToken(); // START_ARRAY
+                            while (jp.nextToken() != JsonToken.END_ARRAY) {
+                                String entryId = jp.getText();
+
+                                EntityRef entryRef = new SimpleEntityRef(UUID.fromString(entryId));
+                                entityWrapper = new ConnectionEvent(ownerEntityRef, connectionType, entryRef);
+
+                                // Creates a new subscriber to the observer with the given connection wrapper
+                                subscriber.onNext(entityWrapper);
+                            }
+                        }
+
+                    }
+                    // create the  wrapper for dictionaries
+                    else if (collectionName.equals("dictionaries")) {
+
+                        jp.nextToken(); // START_OBJECT
+                        while (jp.nextToken() != JsonToken.END_OBJECT) {
+
+                            String dictionaryName = jp.getCurrentName();
+
+                            jp.nextToken();
+
+                            Map<String, Object> dictionary = jp.readValueAs(HashMap.class);
+                            entityWrapper = new DictionaryEvent(ownerEntityRef, dictionaryName, dictionary);
+
+                            // Creates a new subscriber to the observer with the given dictionary wrapper
+                            subscriber.onNext(entityWrapper);
+                        }
+                        subscriber.onCompleted();
+
+                    } else {
+
+                        // Regular collections
+                        jp.nextToken(); // START_OBJECT
+
+                        Map<String, Object> properties = new HashMap<String, Object>();
+                        JsonToken token = jp.nextToken();
+
+                        while (token != JsonToken.END_OBJECT) {
+                            if (token == JsonToken.VALUE_STRING || token == JsonToken.VALUE_NUMBER_INT) {
+                                String key = jp.getCurrentName();
+                                if (key.equals("uuid")) {
+                                    entityUuid = jp.getText();
+
+                                } else if (key.equals("type")) {
+                                    entityType = jp.getText();
+                                } else if (key.length() != 0 && jp.getText().length() != 0) {
+                                    String value = jp.getText();
+                                    properties.put(key, value);
+                                }
+                            }
+                            token = jp.nextToken();
+                        }
+
+                        ownerEntityRef = new SimpleEntityRef(entityType, UUID.fromString(entityUuid));
+                        entityWrapper = new EntityEvent(UUID.fromString(entityUuid), entityType, properties);
+
+                        // Creates a new subscriber to the observer with the given dictionary wrapper
+                        subscriber.onNext(entityWrapper);
+
+                    }
+                }
+            } catch (Exception e) {
+                // skip illegal entity UUID and go to next one
+                fileImport.setErrorMessage(e.getMessage());
+                try {
+                    rootEm.update(fileImport);
+                } catch (Exception ex) {
+                }
+                subscriber.onError(e);
+            }
+        }
+    }
+}
+
+/**
+ * Custom Exception class for Organization Not Found
+ */
+class OrganizationNotFoundException extends Exception {
+    OrganizationNotFoundException(String s) {
+        super(s);
+    }
+}
+
+/**
+ * Custom Exception class for Application Not Found
+ */
+class ApplicationNotFoundException extends Exception {
+    ApplicationNotFoundException(String s) {
+        super(s);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java
new file mode 100644
index 0000000..e21fe0a
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.management.importer;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.io.File;
+
+/**
+ * Created by ApigeeCorporation on 7/8/14.
+ */
+
+// interface for S3ImportImpl
+public interface S3Import {
+    ArrayList<File> copyFromS3(Map<String, Object> exportInfo, String filename, int type);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java
new file mode 100644
index 0000000..ea999f6
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.management.importer;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Module;
+import org.jclouds.ContextBuilder;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.BlobStoreContext;
+import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.blobstore.domain.MutableBlobMetadata;
+import org.jclouds.blobstore.domain.PageSet;
+import org.jclouds.blobstore.domain.StorageMetadata;
+import org.jclouds.blobstore.options.ListContainerOptions;
+import org.jclouds.http.config.JavaUrlHttpCommandExecutorServiceModule;
+import org.jclouds.logging.log4j.config.Log4JLoggingModule;
+import org.jclouds.netty.config.NettyPayloadModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+
+public class S3ImportImpl implements S3Import {
+
+    private BlobStore blobStore;
+    private ArrayList<Blob> blobs = new ArrayList<Blob>();
+    private ArrayList<File> files = new ArrayList<File>();
+    private int i=0;
+
+    /**
+     * Downloads the files from s3 into temp local files
+     * @param importInfo the information entered by the user required to perform import from S3
+     * @param filename the filename generated based on the request URI
+     * @param type  it indicates the type of import. 0 - Collection , 1 - Application and 2 - Organization
+     * @return It returns an ArrayList of files i.e. the files downloaded from s3
+     */
+    public ArrayList<File> copyFromS3( final Map<String,Object> importInfo, String filename , int type) {
+
+        Map<String,Object> properties = ( Map<String, Object> ) importInfo.get( "properties" );
+
+        Map<String, Object> storage_info = (Map<String,Object>)properties.get( "storage_info" );
+
+        String bucketName = ( String ) storage_info.get( "bucket_location" );
+        String accessId = ( String ) storage_info.get( "s3_access_id" );
+        String secretKey = ( String ) storage_info.get( "s3_key" );
+
+        Properties overrides = new Properties();
+        overrides.setProperty( "s3" + ".identity", accessId );
+        overrides.setProperty( "s3" + ".credential", secretKey );
+
+        final Iterable<? extends Module> MODULES = ImmutableSet
+                .of(new JavaUrlHttpCommandExecutorServiceModule(), new Log4JLoggingModule(),
+                        new NettyPayloadModule());
+
+        BlobStoreContext context =
+                ContextBuilder.newBuilder("s3").credentials( accessId, secretKey ).modules( MODULES )
+                        .overrides( overrides ).buildView( BlobStoreContext.class );
+
+        try{
+
+            blobStore = context.getBlobStore();
+
+            // gets all the files in the bucket recursively
+            PageSet<? extends StorageMetadata> pageSet = blobStore.list(bucketName, new ListContainerOptions().recursive());
+
+            Iterator itr = pageSet.iterator();
+
+            while(itr.hasNext())
+            {
+                String fname = ((MutableBlobMetadata)itr.next()).getName();
+                switch(type) {
+                    // check if file is a collection file and is in format <org_name>/<app_name>.<collection_name>.[0-9]+.json
+                    case 0:
+                        if(fname.contains(filename))
+                        {
+                            copyFile(bucketName,fname);
+                            i++;
+                        }
+                        break;
+                    // check if file is an application file and is in format <org_name>/<app_name>.[0-9]+.json
+                    case 1:
+                        if(fname.matches(filename+"[0-9]+\\.json"))
+                        {
+                            copyFile(bucketName,fname);
+                            i++;
+                        }
+                        break;
+                    // check if file is an application file and is in format <org_name>/[-a-zA-Z0-9]+.[0-9]+.json
+                    case 2:
+                        if(fname.matches(filename+"[-a-zA-Z0-9]+\\.[0-9]+\\.json"))
+                        {
+                            copyFile(bucketName,fname);
+                            i++;
+                        }
+                        break;
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return files;
+    }
+
+    /**
+     * Copy the file from s3 into a temp local file
+     * @param bucketName the S3 bucket name from where files need to be imported
+     * @param fname the filename by which the temp file should be created
+     * @throws IOException
+     */
+    void copyFile(String bucketName, String fname) throws IOException {
+
+        Logger logger = LoggerFactory.getLogger(ImportServiceImpl.class);
+        Blob blob = blobStore.getBlob(bucketName, fname);
+        blobs.add(blob);
+        String[] fileOrg = fname.split("/");
+        File organizationDirectory = new File(fileOrg[0]);
+
+        if (!organizationDirectory.exists()) {
+            try {
+                organizationDirectory.mkdir();
+            }catch(SecurityException se) {
+                logger.error(se.getMessage());
+            }
+        }
+
+        File ephemeral = new File(fname);
+
+        FileOutputStream fop = new FileOutputStream(ephemeral);
+
+        blobs.get(i).getPayload().writeTo(fop);
+
+        files.add(ephemeral);
+
+        organizationDirectory.deleteOnExit();
+        ephemeral.deleteOnExit();
+        fop.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/test/java/org/apache/usergrid/ServiceITSetup.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/ServiceITSetup.java b/stack/services/src/test/java/org/apache/usergrid/ServiceITSetup.java
index 5a08338..2a04c52 100644
--- a/stack/services/src/test/java/org/apache/usergrid/ServiceITSetup.java
+++ b/stack/services/src/test/java/org/apache/usergrid/ServiceITSetup.java
@@ -20,7 +20,7 @@ package org.apache.usergrid;
 import org.apache.usergrid.management.ApplicationCreator;
 import org.apache.usergrid.management.ManagementService;
 import org.apache.usergrid.management.export.ExportService;
-import org.apache.usergrid.management.importug.ImportService;
+import org.apache.usergrid.management.importer.ImportService;
 import org.apache.usergrid.security.providers.SignInProviderFactory;
 import org.apache.usergrid.security.tokens.TokenService;
 import org.apache.usergrid.services.ServiceManagerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/test/java/org/apache/usergrid/ServiceITSetupImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/ServiceITSetupImpl.java b/stack/services/src/test/java/org/apache/usergrid/ServiceITSetupImpl.java
index 9f5ac16..8cafa68 100644
--- a/stack/services/src/test/java/org/apache/usergrid/ServiceITSetupImpl.java
+++ b/stack/services/src/test/java/org/apache/usergrid/ServiceITSetupImpl.java
@@ -21,7 +21,7 @@ import org.apache.usergrid.cassandra.CassandraResource;
 import org.apache.usergrid.management.ApplicationCreator;
 import org.apache.usergrid.management.ManagementService;
 import org.apache.usergrid.management.export.ExportService;
-import org.apache.usergrid.management.importug.ImportService;
+import org.apache.usergrid.management.importer.ImportService;
 import org.apache.usergrid.persistence.cassandra.CassandraService;
 import org.apache.usergrid.security.providers.SignInProviderFactory;
 import org.apache.usergrid.security.tokens.TokenService;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ImportServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ImportServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ImportServiceIT.java
index 3cabb89..5453324 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ImportServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ImportServiceIT.java
@@ -32,9 +32,9 @@ import org.apache.usergrid.management.UserInfo;
 import org.apache.usergrid.management.export.ExportService;
 import org.apache.usergrid.management.export.S3Export;
 import org.apache.usergrid.management.export.S3ExportImpl;
-import org.apache.usergrid.management.importug.ImportService;
-import org.apache.usergrid.management.importug.S3Import;
-import org.apache.usergrid.management.importug.S3ImportImpl;
+import org.apache.usergrid.management.importer.ImportService;
+import org.apache.usergrid.management.importer.S3Import;
+import org.apache.usergrid.management.importer.S3ImportImpl;
 import org.apache.usergrid.persistence.*;
 import org.apache.usergrid.persistence.entities.JobData;
 import org.jclouds.ContextBuilder;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/test/java/org/apache/usergrid/management/cassandra/MockS3ImportImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/cassandra/MockS3ImportImpl.java b/stack/services/src/test/java/org/apache/usergrid/management/cassandra/MockS3ImportImpl.java
index b6b54b0..23f264c 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/cassandra/MockS3ImportImpl.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/cassandra/MockS3ImportImpl.java
@@ -17,15 +17,13 @@
 
 package org.apache.usergrid.management.cassandra;
 
-import org.apache.usergrid.management.importug.S3Import;
+import org.apache.usergrid.management.importer.S3Import;
 
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Map;
 
-/**
- * Created by ApigeeCorporation on 7/8/14.
- */
+
 public class MockS3ImportImpl implements S3Import{
     private final String filename;
 


[2/2] git commit: Completing rename of "importUG" package to "importer" to avoid capital letters in package name.

Posted by sn...@apache.org.
Completing rename of "importUG" package to "importer" to avoid capital letters in package name.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5d6c4acc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5d6c4acc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5d6c4acc

Branch: refs/heads/two-dot-o-import
Commit: 5d6c4accc5e81f0c5b13dff001797abd4e86e759
Parents: 52dbfe4
Author: Dave Johnson <dm...@apigee.com>
Authored: Wed Oct 15 10:01:33 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Wed Oct 15 10:01:33 2014 -0400

----------------------------------------------------------------------
 .../applications/ApplicationResource.java       |   2 +-
 .../management/importUG/FileImportJob.java      | 109 ---
 .../usergrid/management/importUG/ImportJob.java |  91 --
 .../management/importUG/ImportService.java      |  89 --
 .../management/importUG/ImportServiceImpl.java  | 977 -------------------
 .../usergrid/management/importUG/S3Import.java  |  31 -
 .../management/importUG/S3ImportImpl.java       | 160 ---
 .../management/importer/FileImportJob.java      | 109 +++
 .../usergrid/management/importer/ImportJob.java |  91 ++
 .../management/importer/ImportService.java      |  89 ++
 .../management/importer/ImportServiceImpl.java  | 977 +++++++++++++++++++
 .../usergrid/management/importer/S3Import.java  |  31 +
 .../management/importer/S3ImportImpl.java       | 160 +++
 .../org/apache/usergrid/ServiceITSetup.java     |   2 +-
 .../org/apache/usergrid/ServiceITSetupImpl.java |   2 +-
 .../management/cassandra/ImportServiceIT.java   |   6 +-
 .../management/cassandra/MockS3ImportImpl.java  |   6 +-
 17 files changed, 1465 insertions(+), 1467 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
index c204b8c..e054739 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
@@ -25,7 +25,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.usergrid.management.ApplicationInfo;
 import org.apache.usergrid.management.OrganizationInfo;
 import org.apache.usergrid.management.export.ExportService;
-import org.apache.usergrid.management.importug.ImportService;
+import org.apache.usergrid.management.importer.ImportService;
 import org.apache.usergrid.rest.AbstractContextResource;
 import org.apache.usergrid.rest.ApiResponse;
 import org.apache.usergrid.rest.applications.ServiceResource;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/main/java/org/apache/usergrid/management/importUG/FileImportJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importUG/FileImportJob.java b/stack/services/src/main/java/org/apache/usergrid/management/importUG/FileImportJob.java
deleted file mode 100644
index 2f0dcba..0000000
--- a/stack/services/src/main/java/org/apache/usergrid/management/importUG/FileImportJob.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.usergrid.management.importug;
-
-import org.apache.usergrid.batch.JobExecution;
-import org.apache.usergrid.batch.job.OnlyOnceJob;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.EntityManagerFactory;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.entities.FileImport;
-import org.apache.usergrid.persistence.entities.Import;
-import org.apache.usergrid.persistence.entities.JobData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import java.util.List;
-import java.util.UUID;
-import static org.apache.usergrid.corepersistence.CpEntityManagerFactory.MANAGEMENT_APPLICATION_ID;
-import org.apache.usergrid.persistence.index.query.Query.Level;
-
-
-@Component("fileImportJob")
-public class FileImportJob extends OnlyOnceJob {
-
-    public static final String FILE_IMPORT_ID = "fileImportId";
-    private static final Logger logger = LoggerFactory.getLogger(FileImportJob.class);
-
-    // injected the Entity Manager Factory
-    protected EntityManagerFactory emf;
-
-    @Autowired
-    ImportService importService;
-
-    public FileImportJob() {
-        logger.info( "FileImportJob created " + this );
-    }
-
-    @Override
-    protected void doJob(JobExecution jobExecution) throws Exception {
-        logger.info( "execute FileImportJob {}", jobExecution );
-
-        JobData jobData = jobExecution.getJobData();
-        if ( jobData == null ) {
-            logger.error( "jobData cannot be null" );
-            return;
-        }
-
-        // heartbeat to indicate job has started
-        jobExecution.heartbeat();
-
-        // call the File Parser for the file set in job execution
-        importService.FileParser( jobExecution );
-
-        logger.error("File Import Service completed job");
-    }
-
-    @Override
-    protected long getDelay(JobExecution execution) throws Exception {
-        return 100;
-    }
-
-    @Autowired
-    public void setImportService( final ImportService importService ) {
-        this.importService = importService;
-    }
-
-    /**
-     * This method is called when the job is retried maximum times by the scheduler but still fails.
-     * Thus the scheduler marks it as DEAD.
-     */
-    @Override
-    public void dead( final JobExecution execution ) throws Exception {
-
-        // Get the root entity manager
-        EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
-
-        // Mark the sub-job i.e. File Import Job as Failed
-        FileImport fileImport = importService.getFileImportEntity(execution);
-        fileImport.setErrorMessage("The Job has been tried maximum times but still failed");
-        fileImport.setState(FileImport.State.FAILED);
-        rootEm.update(fileImport);
-
-        // If one file Job fails, mark the main import Job also as failed
-        Results ImportJobResults = rootEm.getConnectingEntities( 
-                fileImport, "includes", null, Level.ALL_PROPERTIES);
-        List<Entity> importEntity = ImportJobResults.getEntities();
-        UUID importId = importEntity.get(0).getUuid();
-        Import importUG = rootEm.get(importId, Import.class);
-        importUG.setState(Import.State.FAILED);
-        rootEm.update(importUG);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportJob.java b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportJob.java
deleted file mode 100644
index 089ecd6..0000000
--- a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportJob.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.usergrid.management.importug;
-
-import org.apache.usergrid.batch.JobExecution;
-import org.apache.usergrid.batch.job.OnlyOnceJob;
-import static org.apache.usergrid.corepersistence.CpEntityManagerFactory.MANAGEMENT_APPLICATION_ID;
-import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.EntityManagerFactory;
-import org.apache.usergrid.persistence.entities.Import;
-import org.apache.usergrid.persistence.entities.JobData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-
-@Component("importJob")
-public class ImportJob extends OnlyOnceJob {
-
-    public static final String IMPORT_ID = "importId";
-    private static final Logger logger = LoggerFactory.getLogger(ImportJob.class);
-
-    //injected the Entity Manager Factory
-    protected EntityManagerFactory emf;
-    @Autowired
-    ImportService importService;
-
-    public ImportJob() {
-        logger.info( "ImportJob created " + this );
-    }
-
-    @Override
-    protected void doJob(JobExecution jobExecution) throws Exception {
-        logger.info( "execute ImportJob {}", jobExecution );
-
-        JobData jobData = jobExecution.getJobData();
-        if ( jobData == null ) {
-            logger.error( "jobData cannot be null" );
-            return;
-        }
-
-        // heartbeat to indicate job has started
-        jobExecution.heartbeat();
-
-        // call the doImport method from import service which schedules the sub-jobs i.e. parsing of files to FileImport Job
-        importService.doImport( jobExecution );
-
-        logger.error("Import Service completed job");
-    }
-
-    @Override
-    protected long getDelay(JobExecution execution) throws Exception {
-        return 100;
-    }
-
-    @Autowired
-    public void setImportService( final ImportService importService ) {
-        this.importService = importService;
-    }
-
-    /*
-    This method is called when the job is retried maximum times by the scheduler but still fails. Thus the scheduler marks it as DEAD.
-     */
-    @Override
-    public void dead( final JobExecution execution ) throws Exception {
-
-        // marks the job as failed as it will not be retried by the scheduler.
-        EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
-        Import importUG = importService.getImportEntity(execution);
-        importUG.setErrorMessage("The Job has been tried maximum times but still failed");
-        importUG.setState(Import.State.FAILED);
-        rootEm.update(importUG);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportService.java b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportService.java
deleted file mode 100644
index 3e8aa4a..0000000
--- a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportService.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.usergrid.management.importug;
-
-import org.apache.usergrid.batch.JobExecution;
-import org.apache.usergrid.persistence.entities.FileImport;
-import org.apache.usergrid.persistence.entities.Import;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.UUID;
-
-/**
- * Performs all functions related to importing
- */
-public interface ImportService {
-
-    /**
-     * Schedules the import to execute
-     */
-    UUID schedule(Map<String, Object> json) throws Exception;
-
-    /**
-     * Perform the import from the external resource
-     */
-    void doImport(JobExecution jobExecution) throws Exception;
-
-    /**
-     * Parses the input file and creates entities
-     *
-     * @param jobExecution
-     * @throws Exception
-     */
-    void FileParser(JobExecution jobExecution) throws Exception;
-
-    /**
-     * Get the state for the Job with UUID
-     * @param uuid Job UUID
-     * @return State of Job
-     * @throws Exception
-     */
-    String getState(UUID uuid) throws Exception;
-
-    /**
-     * Returns error message for the job with UUID
-     * @param uuid Job UUID
-     * @return error message
-     * @throws Exception
-     */
-    String getErrorMessage(UUID uuid) throws Exception;
-
-    /**
-     * Returns all the temp files downloaded from s3
-     * @return the list of downloaded files from S3.
-     */
-    ArrayList<File> getEphemeralFile();
-
-    /**
-     * @param jobExecution
-     * @return FileImportEntity
-     * @throws Exception
-     */
-    FileImport getFileImportEntity(final JobExecution jobExecution) throws Exception;
-
-    /**
-     * @param jobExecution
-     * @return ImportEntity
-     * @throws Exception
-     */
-    Import getImportEntity(final JobExecution jobExecution) throws Exception;
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
deleted file mode 100644
index 5cd505a..0000000
--- a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
+++ /dev/null
@@ -1,977 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.usergrid.management.importug;
-
-import org.apache.usergrid.batch.JobExecution;
-import org.apache.usergrid.batch.service.SchedulerService;
-import org.apache.usergrid.management.ApplicationInfo;
-import org.apache.usergrid.management.ManagementService;
-import org.apache.usergrid.management.OrganizationInfo;
-import org.apache.usergrid.persistence.*;
-import org.apache.usergrid.persistence.entities.FileImport;
-import org.apache.usergrid.persistence.entities.Import;
-import org.apache.usergrid.persistence.entities.JobData;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import rx.Observable;
-import rx.Subscriber;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicLong;
-import static org.apache.usergrid.corepersistence.CpEntityManagerFactory.MANAGEMENT_APPLICATION_ID;
-import org.apache.usergrid.persistence.index.query.Query.Level;
-
-
-public class ImportServiceImpl implements ImportService {
-
-    public static final String IMPORT_ID = "importId";
-    public static final String IMPORT_JOB_NAME = "importJob";
-    public static final String FILE_IMPORT_ID = "fileImportId";
-    public static final String FILE_IMPORT_JOB_NAME = "fileImportJob";
-
-    //Amount of time that has passed before sending another heart beat in millis
-    public static final int TIMESTAMP_DELTA = 5000;
-
-    private static final Logger logger = LoggerFactory.getLogger(ImportServiceImpl.class);
-
-    //injected the Entity Manager Factory
-    protected EntityManagerFactory emf;
-    private ArrayList<File> files;
-
-    //dependency injection
-    private SchedulerService sch;
-
-    //inject Management Service to access Organization Data
-    private ManagementService managementService;
-    private JsonFactory jsonFactory = new JsonFactory();
-
-    private int entityCount = 0;
-
-    /**
-     * This schedules the main import Job
-     *
-     * @param config configuration of the job to be scheduled
-     * @return it returns the UUID of the scheduled job
-     * @throws Exception
-     */
-    @Override
-    public UUID schedule(Map<String, Object> config) throws Exception {
-
-        if (config == null) {
-            logger.error("import information cannot be null");
-            return null;
-        }
-
-        EntityManager rootEm = null;
-        try {
-            rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
-            Set<String> collections = rootEm.getApplicationCollections();
-            if (!collections.contains("imports")) {
-                rootEm.createApplicationCollection("imports");
-            }
-        } catch (Exception e) {
-            logger.error("application doesn't exist within the current context");
-            return null;
-        }
-
-        Import importUG = new Import();
-
-        // create the import entity to store all metadata about the import job
-        try {
-            importUG = rootEm.create(importUG);
-        } catch (Exception e) {
-            logger.error("Import entity creation failed");
-            return null;
-        }
-
-        // update state for import job to created
-        importUG.setState(Import.State.CREATED);
-        rootEm.update(importUG);
-
-        // set data to be transferred to importInfo
-        JobData jobData = new JobData();
-        jobData.setProperty("importInfo", config);
-        jobData.setProperty(IMPORT_ID, importUG.getUuid());
-
-        long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
-
-        // schedule import job
-        sch.createJob(IMPORT_JOB_NAME, soonestPossible, jobData);
-
-        // update state for import job to created
-        importUG.setState(Import.State.SCHEDULED);
-        rootEm.update(importUG);
-
-        return importUG.getUuid();
-    }
-
-    /**
-     * This schedules the sub  FileImport Job
-     *
-     * @param file file to be scheduled
-     * @return it returns the UUID of the scheduled job
-     * @throws Exception
-     */
-    public UUID scheduleFile(String file, EntityRef importRef) throws Exception {
-
-        EntityManager rootEm = null;
-
-        try {
-            rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
-        } catch (Exception e) {
-            logger.error("application doesn't exist within the current context");
-            return null;
-        }
-
-        // create a FileImport entity to store metadata about the fileImport job
-        FileImport fileImport = new FileImport();
-
-        fileImport.setFileName(file);
-        fileImport.setCompleted(false);
-        fileImport.setLastUpdatedUUID(" ");
-        fileImport.setErrorMessage(" ");
-        fileImport.setState(FileImport.State.CREATED);
-        fileImport = rootEm.create(fileImport);
-
-        Import importUG = rootEm.get(importRef, Import.class);
-
-        try {
-            // create a connection between the main import job and the sub FileImport Job
-            rootEm.createConnection(importUG, "includes", fileImport);
-        } catch (Exception e) {
-            logger.error(e.getMessage());
-            return null;
-        }
-
-        // mark the File Import Job as created
-        fileImport.setState(FileImport.State.CREATED);
-        rootEm.update(fileImport);
-
-        //set data to be transferred to the FileImport Job
-        JobData jobData = new JobData();
-        jobData.setProperty("File", file);
-        jobData.setProperty(FILE_IMPORT_ID, fileImport.getUuid());
-
-        long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
-
-        //schedule file import job
-        sch.createJob(FILE_IMPORT_JOB_NAME, soonestPossible, jobData);
-
-        //update state of the job to Scheduled
-        fileImport.setState(FileImport.State.SCHEDULED);
-        rootEm.update(fileImport);
-
-        return fileImport.getUuid();
-    }
-
-    /**
-     * Query Entity Manager for the state of the Import Entity. This corresponds to the GET /import
-     *
-     * @return String
-     */
-    @Override
-    public String getState(UUID uuid) throws Exception {
-        if (uuid == null) {
-            logger.error("UUID passed in cannot be null.");
-            return "UUID passed in cannot be null";
-        }
-
-        EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
-
-        //retrieve the import entity.
-        Import importUG = rootEm.get(uuid, Import.class);
-
-        if (importUG == null) {
-            logger.error("no entity with that uuid was found");
-            return "No Such Element found";
-        }
-        return importUG.getState().toString();
-    }
-
-    /**
-     * Query Entity Manager for the error message generated for an import job.
-     *
-     * @return String
-     */
-    @Override
-    public String getErrorMessage(final UUID uuid) throws Exception {
-
-        //get application entity manager
-
-        if (uuid == null) {
-            logger.error("UUID passed in cannot be null.");
-            return "UUID passed in cannot be null";
-        }
-
-        EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
-
-        //retrieve the import entity.
-        Import importUG = rootEm.get(uuid, Import.class);
-
-        if (importUG == null) {
-            logger.error("no entity with that uuid was found");
-            return "No Such Element found";
-        }
-        return importUG.getErrorMessage().toString();
-    }
-
-    /**
-     * Returns the Import Entity that stores all meta-data for the particular import Job
-     * @param jobExecution the import job details
-     * @return Import Entity
-     * @throws Exception
-     */
-    @Override
-    public Import getImportEntity(final JobExecution jobExecution) throws Exception {
-
-        UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
-        EntityManager importManager = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
-
-        return importManager.get(importId, Import.class);
-    }
-
-
-    /**
-     * Returns the File Import Entity that stores all meta-data for the particular sub File import Job
-     * @param jobExecution the file import job details
-     * @return File Import Entity
-     * @throws Exception
-     */
-    @Override
-    public FileImport getFileImportEntity(final JobExecution jobExecution) throws Exception {
-
-        UUID fileImportId = (UUID) jobExecution.getJobData().getProperty(FILE_IMPORT_ID);
-        EntityManager em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
-
-        return em.get(fileImportId, FileImport.class);
-    }
-
-    /**
-     * This returns the temporary files downloaded form s3
-     */
-    @Override
-    public ArrayList<File> getEphemeralFile() {
-        return files;
-    }
-
-    public SchedulerService getSch() {
-        return sch;
-    }
-
-
-    public void setSch(final SchedulerService sch) {
-        this.sch = sch;
-    }
-
-
-    public EntityManagerFactory getEmf() {
-        return emf;
-    }
-
-
-    public void setEmf(final EntityManagerFactory emf) {
-        this.emf = emf;
-    }
-
-
-    public ManagementService getManagementService() {
-
-        return managementService;
-    }
-
-
-    public void setManagementService(final ManagementService managementService) {
-        this.managementService = managementService;
-    }
-
-    /**
-     * This method gets the files from s3 and also creates sub-jobs for each file i.e. File Import Jobs
-     * @param jobExecution the job created by the scheduler with all the required config data
-     * @throws Exception
-     */
-    @Override
-    public void doImport(JobExecution jobExecution) throws Exception {
-
-        Map<String, Object> config = (Map<String, Object>) jobExecution.getJobData().getProperty("importInfo");
-        Object s3PlaceHolder = jobExecution.getJobData().getProperty("s3Import");
-        S3Import s3Import = null;
-
-        if (config == null) {
-            logger.error("Import Information passed through is null");
-            return;
-        }
-
-        //get the entity manager for the application, and the entity that this Import corresponds to.
-        UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
-
-        EntityManager rooteEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
-        Import importUG = rooteEm.get(importId, Import.class);
-
-        //update the entity state to show that the job has officially started.
-        importUG.setState(Import.State.STARTED);
-        importUG.setStarted(System.currentTimeMillis());
-        importUG.setErrorMessage(" ");
-        rooteEm.update(importUG);
-        try {
-            if (s3PlaceHolder != null) {
-                s3Import = (S3Import) s3PlaceHolder;
-            } else {
-                s3Import = new S3ImportImpl();
-            }
-        } catch (Exception e) {
-            logger.error("S3Import doesn't exist");
-            importUG.setErrorMessage(e.getMessage());
-            importUG.setState(Import.State.FAILED);
-            rooteEm.update(importUG);
-            return;
-        }
-
-        try {
-
-            if (config.get("organizationId") == null) {
-                logger.error("No organization could be found");
-                importUG.setErrorMessage("No organization could be found");
-                importUG.setState(Import.State.FAILED);
-                rooteEm.update(importUG);
-                return;
-            } else if (config.get("applicationId") == null) {
-                //import All the applications from an organization
-                importApplicationsFromOrg((UUID) config.get("organizationId"), config, jobExecution, s3Import);
-            } else if (config.get("collectionName") == null) {
-                //imports an Application from a single organization
-                importApplicationFromOrg((UUID) config.get("organizationId"), (UUID) config.get("applicationId"), config, jobExecution, s3Import);
-            } else {
-                //imports a single collection from an app org combo
-                importCollectionFromOrgApp((UUID) config.get("applicationId"), config, jobExecution, s3Import);
-            }
-
-        } catch (OrganizationNotFoundException e) {
-            importUG.setErrorMessage(e.getMessage());
-            importUG.setState(Import.State.FINISHED);
-            rooteEm.update(importUG);
-            return;
-        } catch (ApplicationNotFoundException e) {
-            importUG.setErrorMessage(e.getMessage());
-            importUG.setState(Import.State.FINISHED);
-            rooteEm.update(importUG);
-            return;
-        }
-
-        if (files.size() == 0) {
-
-            importUG.setState(Import.State.FINISHED);
-            importUG.setErrorMessage("no files found in the bucket with the relevant context");
-            rooteEm.update(importUG);
-
-        } else {
-
-            Map<String, Object> fileMetadata = new HashMap<String, Object>();
-
-            ArrayList<Map<String, Object>> value = new ArrayList<Map<String, Object>>();
-
-            // schedule each file as a separate job
-            for (File eachfile : files) {
-
-                UUID jobID = scheduleFile(eachfile.getPath(), importUG);
-                Map<String, Object> fileJobID = new HashMap<String, Object>();
-                fileJobID.put("FileName", eachfile.getName());
-                fileJobID.put("JobID", jobID.toString());
-                value.add(fileJobID);
-            }
-
-            fileMetadata.put("files", value);
-            importUG.addProperties(fileMetadata);
-            rooteEm.update(importUG);
-        }
-        return;
-    }
-
-    /**
-     * Imports a specific collection from an org-app combo.
-     */
-    private void importCollectionFromOrgApp(UUID applicationUUID, final Map<String, Object> config,
-                                            final JobExecution jobExecution, S3Import s3Import) throws Exception {
-
-        //retrieves import entity
-        Import importUG = getImportEntity(jobExecution);
-        ApplicationInfo application = managementService.getApplicationInfo(applicationUUID);
-
-        if (application == null) {
-            throw new ApplicationNotFoundException("Application Not Found");
-        }
-
-        String collectionName = config.get("collectionName").toString();
-
-        // prepares the prefix path for the files to be imported depending on the endpoint being hit
-        String appFileName = prepareInputFileName("application", application.getName(), collectionName);
-
-        files = fileTransfer(importUG, appFileName, config, s3Import, 0);
-
-    }
-
-    /**
-     * Imports a specific applications from an organization
-     */
-    private void importApplicationFromOrg(UUID organizationUUID, UUID applicationId, final Map<String, Object> config,
-                                          final JobExecution jobExecution, S3Import s3Import) throws Exception {
-
-        //retrieves import entity
-        Import importUG = getImportEntity(jobExecution);
-
-        ApplicationInfo application = managementService.getApplicationInfo(applicationId);
-
-        if (application == null) {
-            throw new ApplicationNotFoundException("Application Not Found");
-        }
-
-        // prepares the prefix path for the files to be imported depending on the endpoint being hit
-        String appFileName = prepareInputFileName("application", application.getName(), null);
-
-        files = fileTransfer(importUG, appFileName, config, s3Import, 1);
-
-    }
-
-    /**
-     * Imports All Applications from an Organization
-     */
-    private void importApplicationsFromOrg(UUID organizationUUID, final Map<String, Object> config,
-                                           final JobExecution jobExecution, S3Import s3Import) throws Exception {
-
-        // retrieves import entity
-        Import importUG = getImportEntity(jobExecution);
-        String appFileName = null;
-
-        OrganizationInfo organizationInfo = managementService.getOrganizationByUuid(organizationUUID);
-        if (organizationInfo == null) {
-            throw new OrganizationNotFoundException("Organization Not Found");
-        }
-
-        // prepares the prefix path for the files to be imported depending on the endpoint being hit
-        appFileName = prepareInputFileName("organization", organizationInfo.getName(), null);
-
-        files = fileTransfer(importUG, appFileName, config, s3Import, 2);
-
-    }
-
-    /**
-     * prepares the prefix path for the files to be imported depending on the endpoint being hit
-     * @param type just a label such us: organization, application.
-     * @return the file name concatenated with the type and the name of the collection
-     */
-    protected String prepareInputFileName(String type, String name, String CollectionName) {
-        StringBuilder str = new StringBuilder();
-
-        // in case of type organization --> the file name will be "<org_name>/"
-        if (type.equals("organization")) {
-
-            str.append(name);
-            str.append("/");
-
-        } else if (type.equals("application")) {
-
-            // in case of type application --> the file name will be "<org_name>/<app_name>."
-            str.append(name);
-            str.append(".");
-
-            if (CollectionName != null) {
-
-                // in case of type application and collection import --> the file name will be "<org_name>/<app_name>.<collection_name>."
-                str.append(CollectionName);
-                str.append(".");
-
-            }
-        }
-
-        String inputFileName = str.toString();
-
-        return inputFileName;
-    }
-
-    /**
-     * @param importUG    Import instance
-     * @param appFileName the base file name for the files to be downloaded
-     * @param config      the config information for the import job
-     * @param s3Import    s3import instance
-     * @param type        it indicates the type of import. 0 - Collection , 1 - Application and 2 - Organization
-     * @return
-     */
-    public ArrayList<File> fileTransfer(Import importUG, String appFileName, Map<String, Object> config,
-                                        S3Import s3Import, int type) throws Exception {
-
-        EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
-        ArrayList<File> files = new ArrayList<File>();
-
-        try {
-            files = s3Import.copyFromS3(config, appFileName, type);
-        } catch (Exception e) {
-            importUG.setErrorMessage(e.getMessage());
-            importUG.setState(Import.State.FAILED);
-            rootEm.update(importUG);
-        }
-        return files;
-    }
-
-    /**
-     * The loops through each temp file and parses it to store the entities from the json back into usergrid
-     *
-     * @throws Exception
-     */
-    @Override
-    public void FileParser(JobExecution jobExecution) throws Exception {
-
-        // add properties to the import entity
-        FileImport fileImport = getFileImportEntity(jobExecution);
-
-        File file = new File(jobExecution.getJobData().getProperty("File").toString());
-
-        EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
-        rootEm.update(fileImport);
-
-        boolean completed = fileImport.getCompleted();
-
-        // on resume, completed files will not be traversed again
-        if (!completed) {
-
-            // validates the JSON structure
-            if (isValidJSON(file, rootEm, fileImport)) {
-
-                // mark the File import job as started
-                fileImport.setState(FileImport.State.STARTED);
-                rootEm.update(fileImport);
-
-                // gets the application anme from the filename
-                String applicationName = file.getPath().split("\\.")[0];
-
-                ApplicationInfo application = managementService.getApplicationInfo(applicationName);
-
-                JsonParser jp = getJsonParserForFile(file);
-
-                // incase of resume, retrieve the last updated UUID for this file
-                String lastUpdatedUUID = fileImport.getLastUpdatedUUID();
-
-                // this handles partially completed files by updating entities from the point of failure
-                if (!lastUpdatedUUID.equals(" ")) {
-
-                    // go till the last updated entity
-                    while (!jp.getText().equals(lastUpdatedUUID)) {
-                        jp.nextToken();
-                    }
-
-                    // skip the last one and start from the next one
-                    while (!(jp.getCurrentToken() == JsonToken.END_OBJECT && jp.nextToken() == JsonToken.START_OBJECT)) {
-                        jp.nextToken();
-                    }
-                }
-
-                // get to start of an object i.e next entity.
-                while (jp.getCurrentToken() != JsonToken.START_OBJECT) {
-                    jp.nextToken();
-                }
-
-                // get entity manager for the application
-                EntityManager em = emf.getEntityManager(application.getId());
-
-                while (jp.nextToken() != JsonToken.END_ARRAY) {
-                    // import the entities in this file
-                    importEntityStuff(jp, em, rootEm, fileImport, jobExecution);
-                }
-                jp.close();
-
-                // Updates the state of file import job
-                if (!fileImport.getState().equals("FAILED")) {
-
-                    // mark file as completed
-                    fileImport.setCompleted(true);
-                    fileImport.setState(FileImport.State.FINISHED);
-                    rootEm.update(fileImport);
-
-                    //check other files status and mark the status of import Job as Finished if all others are finished
-                    Results ImportJobResults = rootEm.getConnectingEntities(fileImport, "includes", null, Level.ALL_PROPERTIES);
-                    List<Entity> importEntity = ImportJobResults.getEntities();
-                    UUID importId = importEntity.get(0).getUuid();
-                    Import importUG = rootEm.get(importId, Import.class);
-
-                    Results entities = rootEm.getConnectedEntities( importUG, "includes", null, Level.ALL_PROPERTIES);
-                    List<Entity> importFile = entities.getEntities();
-
-                    int count = 0;
-                    for (Entity eachEntity : importFile) {
-                        FileImport fi = rootEm.get(eachEntity.getUuid(), FileImport.class);
-                        if (fi.getState().toString().equals("FINISHED")) {
-                            count++;
-                        } else if (fi.getState().toString().equals("FAILED")) {
-                            importUG.setState(Import.State.FAILED);
-                            rootEm.update(importUG);
-                            break;
-                        }
-                    }
-                    if (count == importFile.size()) {
-                        importUG.setState(Import.State.FINISHED);
-                        rootEm.update(importUG);
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Checks if a file is a valid JSON
-     * @param collectionFile the file being validated
-     * @param rootEm    the Entity Manager for the Management application
-     * @param fileImport the file import entity
-     * @return
-     * @throws Exception
-     */
-    private boolean isValidJSON(File collectionFile, EntityManager rootEm, FileImport fileImport) throws Exception {
-
-        boolean valid = false;
-        try {
-            final JsonParser jp = jsonFactory.createJsonParser(collectionFile);
-            while (jp.nextToken() != null) {
-            }
-            valid = true;
-        } catch (JsonParseException e) {
-            e.printStackTrace();
-            fileImport.setErrorMessage(e.getMessage());
-            rootEm.update(fileImport);
-        } catch (IOException e) {
-            fileImport.setErrorMessage(e.getMessage());
-            rootEm.update(fileImport);
-        }
-        return valid;
-    }
-
-
-    /**
-     * Gets the JSON parser for given file
-     * @param collectionFile the file for which JSON parser is required
-     * @return
-     * @throws Exception
-     */
-    private JsonParser getJsonParserForFile(File collectionFile) throws Exception {
-        JsonParser jp = jsonFactory.createJsonParser(collectionFile);
-        jp.setCodec(new ObjectMapper());
-        return jp;
-    }
-
-
-
-    /**
-     * Imports the entity's connecting references (collections, connections and dictionaries)
-     * @param jp  JsonParser pointing to the beginning of the object.
-     * @param em Entity Manager for the application being imported
-     * @param rootEm Entity manager for the root applicaition
-     * @param fileImport the file import entity
-     * @param jobExecution  execution details for the import jbo
-     * @throws Exception
-     */
-    private void importEntityStuff(final JsonParser jp, final EntityManager em, final EntityManager rootEm, final FileImport fileImport, final JobExecution jobExecution) throws Exception {
-
-        final JsonParserObservable subscribe = new JsonParserObservable(jp, em, rootEm, fileImport);
-
-        final Observable<WriteEvent> observable = Observable.create(subscribe);
-
-        /**
-         * This is the action we want to perform for every UUID we receive
-         */
-        final Action1<WriteEvent> doWork = new Action1<WriteEvent>() {
-            @Override
-            public void call(WriteEvent writeEvent) {
-                writeEvent.doWrite(em, jobExecution, fileImport);
-            }
-
-        };
-
-
-        final AtomicLong entityCounter = new AtomicLong();
-        final AtomicLong eventCounter = new AtomicLong();
-        /**
-         * This is boilerplate glue code.  We have to follow this for the parallel operation.  In the "call"
-         * method we want to simply return the input observable + the chain of operations we want to invoke
-         */
-        observable.parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
-            @Override
-            public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
-
-                /* TODO:
-                 * need to fixed so that number of entities created can be counted correctly
-                 * and also update the last updated UUID for the fileImport which is a must for resumability
-                 */
-//                return entityWrapperObservable.doOnNext(doWork).doOnNext(new Action1<WriteEvent>() {
-//
-//                         @Override
-//                         public void call(WriteEvent writeEvent) {
-//                             if (!(writeEvent instanceof EntityEvent)) {
-//                                 final long val = eventCounter.incrementAndGet();
-//                                 if(val % 50 == 0) {
-//                                     jobExecution.heartbeat();
-//                                 }
-//                                 return;
-//                             }
-//
-//                             final long value = entityCounter.incrementAndGet();
-//                             if (value % 2000 == 0) {
-//                                 try {
-//                                     logger.error("UUID = " +((EntityEvent) writeEvent).getEntityUuid().toString() + " value = " + value +"");
-//                                     fileImport.setLastUpdatedUUID(((EntityEvent) writeEvent).getEntityUuid().toString());
-//                                     //checkpoint the UUID here.
-//                                     rootEm.update(fileImport);
-//                                 } catch(Exception ex) {}
-//                             }
-//                             if(value % 100 == 0) {
-//                                 logger.error("heartbeat sent by " + fileImport.getFileName());
-//                                 jobExecution.heartbeat();
-//                             }
-//                         }
-//                     }
-//                );
-
-                return entityWrapperObservable.doOnNext(doWork);
-            }
-        }, Schedulers.io()).toBlocking().last();
-    }
-
-    private interface WriteEvent {
-        public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport);
-    }
-
-    private final class EntityEvent implements WriteEvent {
-        UUID entityUuid;
-        String entityType;
-        Map<String, Object> properties;
-
-        EntityEvent(UUID entityUuid, String entityType, Map<String, Object> properties) {
-            this.entityUuid = entityUuid;
-            this.entityType = entityType;
-            this.properties = properties;
-        }
-
-        public UUID getEntityUuid() {
-            return entityUuid;
-        }
-
-        // Creates entities
-        @Override
-        public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
-            EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
-
-            try {
-                em.create(entityUuid, entityType, properties);
-            } catch (Exception e) {
-                fileImport.setErrorMessage(e.getMessage());
-                try {
-                    rootEm.update(fileImport);
-                } catch (Exception ex) {
-                }
-            }
-        }
-    }
-
-    private final class ConnectionEvent implements WriteEvent {
-        EntityRef ownerEntityRef;
-        String connectionType;
-        EntityRef entryRef;
-
-        ConnectionEvent(EntityRef ownerEntityRef, String connectionType, EntityRef entryRef) {
-            this.ownerEntityRef = ownerEntityRef;
-            this.connectionType = connectionType;
-            this.entryRef = entryRef;
-
-        }
-
-        // creates connections between entities
-        @Override
-        public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
-            EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
-
-            try {
-                em.createConnection(ownerEntityRef, connectionType, entryRef);
-            } catch (Exception e) {
-                fileImport.setErrorMessage(e.getMessage());
-                try {
-                    rootEm.update(fileImport);
-                } catch (Exception ex) {
-                }
-            }
-        }
-    }
-
-    private final class DictionaryEvent implements WriteEvent {
-
-        EntityRef ownerEntityRef;
-        String dictionaryName;
-        Map<String, Object> dictionary;
-
-        DictionaryEvent(EntityRef ownerEntityRef, String dictionaryName, Map<String, Object> dictionary) {
-            this.ownerEntityRef = ownerEntityRef;
-            this.dictionaryName = dictionaryName;
-            this.dictionary = dictionary;
-        }
-
-        // adds map to the dictionary
-        @Override
-        public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
-            EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
-            try {
-                em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
-            } catch (Exception e) {
-                fileImport.setErrorMessage(e.getMessage());
-                try {
-                    rootEm.update(fileImport);
-                } catch (Exception ex) {
-                }
-            }
-        }
-    }
-
-
-    private final class JsonParserObservable implements Observable.OnSubscribe<WriteEvent> {
-        private final JsonParser jp;
-        EntityManager em;
-        EntityManager rootEm;
-        FileImport fileImport;
-
-
-        JsonParserObservable(JsonParser parser, EntityManager em, EntityManager rootEm, FileImport fileImport) {
-            this.jp = parser;
-            this.em = em;
-            this.rootEm = rootEm;
-            this.fileImport = fileImport;
-        }
-
-        @Override
-        public void call(final Subscriber<? super WriteEvent> subscriber) {
-
-            WriteEvent entityWrapper = null;
-            EntityRef ownerEntityRef = null;
-            String entityUuid = "";
-            String entityType = "";
-            try {
-                while (!subscriber.isUnsubscribed() && jp.nextToken() != JsonToken.END_OBJECT) {
-                    String collectionName = jp.getCurrentName();
-
-                    // create the  wrapper for connections
-                    if (collectionName.equals("connections")) {
-
-                        jp.nextToken(); // START_OBJECT
-                        while (jp.nextToken() != JsonToken.END_OBJECT) {
-                            String connectionType = jp.getCurrentName();
-
-                            jp.nextToken(); // START_ARRAY
-                            while (jp.nextToken() != JsonToken.END_ARRAY) {
-                                String entryId = jp.getText();
-
-                                EntityRef entryRef = new SimpleEntityRef(UUID.fromString(entryId));
-                                entityWrapper = new ConnectionEvent(ownerEntityRef, connectionType, entryRef);
-
-                                // Creates a new subscriber to the observer with the given connection wrapper
-                                subscriber.onNext(entityWrapper);
-                            }
-                        }
-
-                    }
-                    // create the  wrapper for dictionaries
-                    else if (collectionName.equals("dictionaries")) {
-
-                        jp.nextToken(); // START_OBJECT
-                        while (jp.nextToken() != JsonToken.END_OBJECT) {
-
-                            String dictionaryName = jp.getCurrentName();
-
-                            jp.nextToken();
-
-                            Map<String, Object> dictionary = jp.readValueAs(HashMap.class);
-                            entityWrapper = new DictionaryEvent(ownerEntityRef, dictionaryName, dictionary);
-
-                            // Creates a new subscriber to the observer with the given dictionary wrapper
-                            subscriber.onNext(entityWrapper);
-                        }
-                        subscriber.onCompleted();
-
-                    } else {
-
-                        // Regular collections
-                        jp.nextToken(); // START_OBJECT
-
-                        Map<String, Object> properties = new HashMap<String, Object>();
-                        JsonToken token = jp.nextToken();
-
-                        while (token != JsonToken.END_OBJECT) {
-                            if (token == JsonToken.VALUE_STRING || token == JsonToken.VALUE_NUMBER_INT) {
-                                String key = jp.getCurrentName();
-                                if (key.equals("uuid")) {
-                                    entityUuid = jp.getText();
-
-                                } else if (key.equals("type")) {
-                                    entityType = jp.getText();
-                                } else if (key.length() != 0 && jp.getText().length() != 0) {
-                                    String value = jp.getText();
-                                    properties.put(key, value);
-                                }
-                            }
-                            token = jp.nextToken();
-                        }
-
-                        ownerEntityRef = new SimpleEntityRef(entityType, UUID.fromString(entityUuid));
-                        entityWrapper = new EntityEvent(UUID.fromString(entityUuid), entityType, properties);
-
-                        // Creates a new subscriber to the observer with the given dictionary wrapper
-                        subscriber.onNext(entityWrapper);
-
-                    }
-                }
-            } catch (Exception e) {
-                // skip illegal entity UUID and go to next one
-                fileImport.setErrorMessage(e.getMessage());
-                try {
-                    rootEm.update(fileImport);
-                } catch (Exception ex) {
-                }
-                subscriber.onError(e);
-            }
-        }
-    }
-}
-
-/**
- * Custom Exception class for Organization Not Found
- */
-class OrganizationNotFoundException extends Exception {
-    OrganizationNotFoundException(String s) {
-        super(s);
-    }
-}
-
-/**
- * Custom Exception class for Application Not Found
- */
-class ApplicationNotFoundException extends Exception {
-    ApplicationNotFoundException(String s) {
-        super(s);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/main/java/org/apache/usergrid/management/importUG/S3Import.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importUG/S3Import.java b/stack/services/src/main/java/org/apache/usergrid/management/importUG/S3Import.java
deleted file mode 100644
index f687b38..0000000
--- a/stack/services/src/main/java/org/apache/usergrid/management/importUG/S3Import.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.usergrid.management.importug;
-
-import java.util.ArrayList;
-import java.util.Map;
-import java.io.File;
-
-/**
- * Created by ApigeeCorporation on 7/8/14.
- */
-
-// interface for S3ImportImpl
-public interface S3Import {
-    ArrayList<File> copyFromS3(Map<String, Object> exportInfo, String filename, int type);
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/main/java/org/apache/usergrid/management/importUG/S3ImportImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importUG/S3ImportImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importUG/S3ImportImpl.java
deleted file mode 100644
index 53a77d9..0000000
--- a/stack/services/src/main/java/org/apache/usergrid/management/importUG/S3ImportImpl.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.usergrid.management.importug;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.inject.Module;
-import org.jclouds.ContextBuilder;
-import org.jclouds.blobstore.BlobStore;
-import org.jclouds.blobstore.BlobStoreContext;
-import org.jclouds.blobstore.domain.Blob;
-import org.jclouds.blobstore.domain.MutableBlobMetadata;
-import org.jclouds.blobstore.domain.PageSet;
-import org.jclouds.blobstore.domain.StorageMetadata;
-import org.jclouds.blobstore.options.ListContainerOptions;
-import org.jclouds.http.config.JavaUrlHttpCommandExecutorServiceModule;
-import org.jclouds.logging.log4j.config.Log4JLoggingModule;
-import org.jclouds.netty.config.NettyPayloadModule;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-
-
-public class S3ImportImpl implements S3Import {
-
-    private BlobStore blobStore;
-    private ArrayList<Blob> blobs = new ArrayList<Blob>();
-    private ArrayList<File> files = new ArrayList<File>();
-    private int i=0;
-
-    /**
-     * Downloads the files from s3 into temp local files
-     * @param importInfo the information entered by the user required to perform import from S3
-     * @param filename the filename generated based on the request URI
-     * @param type  it indicates the type of import. 0 - Collection , 1 - Application and 2 - Organization
-     * @return It returns an ArrayList of files i.e. the files downloaded from s3
-     */
-    public ArrayList<File> copyFromS3( final Map<String,Object> importInfo, String filename , int type) {
-
-        Map<String,Object> properties = ( Map<String, Object> ) importInfo.get( "properties" );
-
-        Map<String, Object> storage_info = (Map<String,Object>)properties.get( "storage_info" );
-
-        String bucketName = ( String ) storage_info.get( "bucket_location" );
-        String accessId = ( String ) storage_info.get( "s3_access_id" );
-        String secretKey = ( String ) storage_info.get( "s3_key" );
-
-        Properties overrides = new Properties();
-        overrides.setProperty( "s3" + ".identity", accessId );
-        overrides.setProperty( "s3" + ".credential", secretKey );
-
-        final Iterable<? extends Module> MODULES = ImmutableSet
-                .of(new JavaUrlHttpCommandExecutorServiceModule(), new Log4JLoggingModule(),
-                        new NettyPayloadModule());
-
-        BlobStoreContext context =
-                ContextBuilder.newBuilder("s3").credentials( accessId, secretKey ).modules( MODULES )
-                        .overrides( overrides ).buildView( BlobStoreContext.class );
-
-        try{
-
-            blobStore = context.getBlobStore();
-
-            // gets all the files in the bucket recursively
-            PageSet<? extends StorageMetadata> pageSet = blobStore.list(bucketName, new ListContainerOptions().recursive());
-
-            Iterator itr = pageSet.iterator();
-
-            while(itr.hasNext())
-            {
-                String fname = ((MutableBlobMetadata)itr.next()).getName();
-                switch(type) {
-                    // check if file is a collection file and is in format <org_name>/<app_name>.<collection_name>.[0-9]+.json
-                    case 0:
-                        if(fname.contains(filename))
-                        {
-                            copyFile(bucketName,fname);
-                            i++;
-                        }
-                        break;
-                    // check if file is an application file and is in format <org_name>/<app_name>.[0-9]+.json
-                    case 1:
-                        if(fname.matches(filename+"[0-9]+\\.json"))
-                        {
-                            copyFile(bucketName,fname);
-                            i++;
-                        }
-                        break;
-                    // check if file is an application file and is in format <org_name>/[-a-zA-Z0-9]+.[0-9]+.json
-                    case 2:
-                        if(fname.matches(filename+"[-a-zA-Z0-9]+\\.[0-9]+\\.json"))
-                        {
-                            copyFile(bucketName,fname);
-                            i++;
-                        }
-                        break;
-                }
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        return files;
-    }
-
-    /**
-     * Copy the file from s3 into a temp local file
-     * @param bucketName the S3 bucket name from where files need to be imported
-     * @param fname the filename by which the temp file should be created
-     * @throws IOException
-     */
-    void copyFile(String bucketName, String fname) throws IOException {
-
-        Logger logger = LoggerFactory.getLogger(ImportServiceImpl.class);
-        Blob blob = blobStore.getBlob(bucketName, fname);
-        blobs.add(blob);
-        String[] fileOrg = fname.split("/");
-        File organizationDirectory = new File(fileOrg[0]);
-
-        if (!organizationDirectory.exists()) {
-            try {
-                organizationDirectory.mkdir();
-            }catch(SecurityException se) {
-                logger.error(se.getMessage());
-            }
-        }
-
-        File ephemeral = new File(fname);
-
-        FileOutputStream fop = new FileOutputStream(ephemeral);
-
-        blobs.get(i).getPayload().writeTo(fop);
-
-        files.add(ephemeral);
-
-        organizationDirectory.deleteOnExit();
-        ephemeral.deleteOnExit();
-        fop.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
new file mode 100644
index 0000000..f5f8ac0
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.management.importer;
+
+import org.apache.usergrid.batch.JobExecution;
+import org.apache.usergrid.batch.job.OnlyOnceJob;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.entities.FileImport;
+import org.apache.usergrid.persistence.entities.Import;
+import org.apache.usergrid.persistence.entities.JobData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import java.util.List;
+import java.util.UUID;
+import static org.apache.usergrid.corepersistence.CpEntityManagerFactory.MANAGEMENT_APPLICATION_ID;
+import org.apache.usergrid.persistence.index.query.Query.Level;
+
+
+@Component("fileImportJob")
+public class FileImportJob extends OnlyOnceJob {
+
+    public static final String FILE_IMPORT_ID = "fileImportId";
+    private static final Logger logger = LoggerFactory.getLogger(FileImportJob.class);
+
+    // injected the Entity Manager Factory
+    protected EntityManagerFactory emf;
+
+    @Autowired
+    ImportService importService;
+
+    public FileImportJob() {
+        logger.info( "FileImportJob created " + this );
+    }
+
+    @Override
+    protected void doJob(JobExecution jobExecution) throws Exception {
+        logger.info( "execute FileImportJob {}", jobExecution );
+
+        JobData jobData = jobExecution.getJobData();
+        if ( jobData == null ) {
+            logger.error( "jobData cannot be null" );
+            return;
+        }
+
+        // heartbeat to indicate job has started
+        jobExecution.heartbeat();
+
+        // call the File Parser for the file set in job execution
+        importService.FileParser( jobExecution );
+
+        logger.error("File Import Service completed job");
+    }
+
+    @Override
+    protected long getDelay(JobExecution execution) throws Exception {
+        return 100;
+    }
+
+    @Autowired
+    public void setImportService( final ImportService importService ) {
+        this.importService = importService;
+    }
+
+    /**
+     * This method is called when the job is retried maximum times by the scheduler but still fails.
+     * Thus the scheduler marks it as DEAD.
+     */
+    @Override
+    public void dead( final JobExecution execution ) throws Exception {
+
+        // Get the root entity manager
+        EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+
+        // Mark the sub-job i.e. File Import Job as Failed
+        FileImport fileImport = importService.getFileImportEntity(execution);
+        fileImport.setErrorMessage("The Job has been tried maximum times but still failed");
+        fileImport.setState(FileImport.State.FAILED);
+        rootEm.update(fileImport);
+
+        // If one file Job fails, mark the main import Job also as failed
+        Results ImportJobResults = rootEm.getConnectingEntities( 
+                fileImport, "includes", null, Level.ALL_PROPERTIES);
+        List<Entity> importEntity = ImportJobResults.getEntities();
+        UUID importId = importEntity.get(0).getUuid();
+        Import importUG = rootEm.get(importId, Import.class);
+        importUG.setState(Import.State.FAILED);
+        rootEm.update(importUG);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java
new file mode 100644
index 0000000..cab6a37
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.management.importer;
+
+import org.apache.usergrid.batch.JobExecution;
+import org.apache.usergrid.batch.job.OnlyOnceJob;
+import static org.apache.usergrid.corepersistence.CpEntityManagerFactory.MANAGEMENT_APPLICATION_ID;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.entities.Import;
+import org.apache.usergrid.persistence.entities.JobData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+
+@Component("importJob")
+public class ImportJob extends OnlyOnceJob {
+
+    public static final String IMPORT_ID = "importId";
+    private static final Logger logger = LoggerFactory.getLogger(ImportJob.class);
+
+    //injected the Entity Manager Factory
+    protected EntityManagerFactory emf;
+    @Autowired
+    ImportService importService;
+
+    public ImportJob() {
+        logger.info( "ImportJob created " + this );
+    }
+
+    @Override
+    protected void doJob(JobExecution jobExecution) throws Exception {
+        logger.info( "execute ImportJob {}", jobExecution );
+
+        JobData jobData = jobExecution.getJobData();
+        if ( jobData == null ) {
+            logger.error( "jobData cannot be null" );
+            return;
+        }
+
+        // heartbeat to indicate job has started
+        jobExecution.heartbeat();
+
+        // call the doImport method from import service which schedules the sub-jobs i.e. parsing of files to FileImport Job
+        importService.doImport( jobExecution );
+
+        logger.error("Import Service completed job");
+    }
+
+    @Override
+    protected long getDelay(JobExecution execution) throws Exception {
+        return 100;
+    }
+
+    @Autowired
+    public void setImportService( final ImportService importService ) {
+        this.importService = importService;
+    }
+
+    /*
+    This method is called when the job is retried maximum times by the scheduler but still fails. Thus the scheduler marks it as DEAD.
+     */
+    @Override
+    public void dead( final JobExecution execution ) throws Exception {
+
+        // marks the job as failed as it will not be retried by the scheduler.
+        EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+        Import importUG = importService.getImportEntity(execution);
+        importUG.setErrorMessage("The Job has been tried maximum times but still failed");
+        importUG.setState(Import.State.FAILED);
+        rootEm.update(importUG);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java
new file mode 100644
index 0000000..c4bb89e
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.management.importer;
+
+import org.apache.usergrid.batch.JobExecution;
+import org.apache.usergrid.persistence.entities.FileImport;
+import org.apache.usergrid.persistence.entities.Import;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Performs all functions related to importing
+ */
+public interface ImportService {
+
+    /**
+     * Schedules the import to execute
+     */
+    UUID schedule(Map<String, Object> json) throws Exception;
+
+    /**
+     * Perform the import from the external resource
+     */
+    void doImport(JobExecution jobExecution) throws Exception;
+
+    /**
+     * Parses the input file and creates entities
+     *
+     * @param jobExecution
+     * @throws Exception
+     */
+    void FileParser(JobExecution jobExecution) throws Exception;
+
+    /**
+     * Get the state for the Job with UUID
+     * @param uuid Job UUID
+     * @return State of Job
+     * @throws Exception
+     */
+    String getState(UUID uuid) throws Exception;
+
+    /**
+     * Returns error message for the job with UUID
+     * @param uuid Job UUID
+     * @return error message
+     * @throws Exception
+     */
+    String getErrorMessage(UUID uuid) throws Exception;
+
+    /**
+     * Returns all the temp files downloaded from s3
+     * @return the list of downloaded files from S3.
+     */
+    ArrayList<File> getEphemeralFile();
+
+    /**
+     * @param jobExecution
+     * @return FileImportEntity
+     * @throws Exception
+     */
+    FileImport getFileImportEntity(final JobExecution jobExecution) throws Exception;
+
+    /**
+     * @param jobExecution
+     * @return ImportEntity
+     * @throws Exception
+     */
+    Import getImportEntity(final JobExecution jobExecution) throws Exception;
+
+
+}