You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/10 21:26:02 UTC

[09/50] [abbrv] git commit: Added heartbeats to ExportService. Sends one between writing entities every time one takes more than 5 seconds.

Added heartbeats to ExportService. Sends one between writing entities every time one takes more than 5 seconds.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bc70a3dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bc70a3dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bc70a3dd

Branch: refs/pull/70/merge
Commit: bc70a3dd87a6b3970eac4d0f0349ddd93653d646
Parents: 9a16d8f
Author: George Reyes <Ap...@George-Reyess-MacBook-Pro.local>
Authored: Thu Feb 13 14:53:40 2014 -0800
Committer: George Reyes <Ap...@George-Reyess-MacBook-Pro.local>
Committed: Thu Feb 13 14:53:40 2014 -0800

----------------------------------------------------------------------
 .../usergrid/management/export/ExportJob.java   |   2 +-
 .../management/export/ExportService.java        |   3 +-
 .../management/export/ExportServiceImpl.java    | 136 ++++++++-----------
 3 files changed, 63 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bc70a3dd/stack/services/src/main/java/org/usergrid/management/export/ExportJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/usergrid/management/export/ExportJob.java b/stack/services/src/main/java/org/usergrid/management/export/ExportJob.java
index bd72c6e..ec5f27b 100644
--- a/stack/services/src/main/java/org/usergrid/management/export/ExportJob.java
+++ b/stack/services/src/main/java/org/usergrid/management/export/ExportJob.java
@@ -39,7 +39,7 @@ public class ExportJob extends OnlyOnceJob {
 
         jobExecution.heartbeat();
 //pass in jobExecution so that you can call the heartbeat in the do export method.
-        exportService.doExport( config );
+        exportService.doExport( config, jobExecution );
 
         logger.info( "executed ExportJob completed normally" );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bc70a3dd/stack/services/src/main/java/org/usergrid/management/export/ExportService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/usergrid/management/export/ExportService.java b/stack/services/src/main/java/org/usergrid/management/export/ExportService.java
index e2e5d54..aa1dd1b 100644
--- a/stack/services/src/main/java/org/usergrid/management/export/ExportService.java
+++ b/stack/services/src/main/java/org/usergrid/management/export/ExportService.java
@@ -3,6 +3,7 @@ package org.usergrid.management.export;
 
 import java.util.UUID;
 
+import org.usergrid.batch.JobExecution;
 import org.usergrid.management.ExportInfo;
 
 
@@ -23,7 +24,7 @@ public interface ExportService {
      * Perform the export to the external resource
      * @param config
      */
-    void doExport(ExportInfo config) throws Exception;
+    void doExport(ExportInfo config, JobExecution jobExecution) throws Exception;
 
     /**
      * Returns the UUID to the user

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bc70a3dd/stack/services/src/main/java/org/usergrid/management/export/ExportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/usergrid/management/export/ExportServiceImpl.java b/stack/services/src/main/java/org/usergrid/management/export/ExportServiceImpl.java
index 5f532c7..dbbe520 100644
--- a/stack/services/src/main/java/org/usergrid/management/export/ExportServiceImpl.java
+++ b/stack/services/src/main/java/org/usergrid/management/export/ExportServiceImpl.java
@@ -19,6 +19,7 @@ import org.codehaus.jackson.util.DefaultPrettyPrinter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.usergrid.batch.JobExecution;
 import org.usergrid.batch.service.SchedulerService;
 import org.usergrid.management.ExportInfo;
 import org.usergrid.management.ManagementService;
@@ -40,7 +41,7 @@ import com.google.common.collect.BiMap;
  *
  *
  */
-public class ExportServiceImpl implements ExportService{
+public class ExportServiceImpl implements ExportService {
 
 
     private static final Logger logger = LoggerFactory.getLogger( ExportServiceImpl.class );
@@ -57,6 +58,9 @@ public class ExportServiceImpl implements ExportService{
     //Maximum amount of entities retrieved in a single go.
     public static final int MAX_ENTITY_FETCH = 100;
 
+    //Amount of time that has passed before sending another heart beat in millis
+    public static final int TIMESTAMP_DELTA = 5000;
+
     private JsonFactory jsonFactory = new JsonFactory();
 
     private String outputDir = "/Users/ApigeeCorportation";
@@ -101,17 +105,17 @@ public class ExportServiceImpl implements ExportService{
         // good error messages when not found.
 
         //validate user has access key to org (rather valid user has admin access token)
-            //this is token validation
+        //this is token validation
         JobData jobData = new JobData();
 
-        jobData.setProperty( "exportInfo",config );
+        jobData.setProperty( "exportInfo", config );
         long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
-        JobData retJobData = sch.createJob( "exportJob",soonestPossible, jobData );
+        JobData retJobData = sch.createJob( "exportJob", soonestPossible, jobData );
         jobUUID = retJobData.getUuid();
 
         try {
             JobStat merp = sch.getStatsForJob( "exportJob", retJobData.getUuid() );
-            System.out.println("hi");
+            System.out.println( "hi" );
         }
         catch ( Exception e ) {
             logger.error( "could not get stats for job" );
@@ -120,24 +124,23 @@ public class ExportServiceImpl implements ExportService{
 
 
     @Override
-    public void doExport( final ExportInfo config ) throws Exception {
+    public void doExport( final ExportInfo config, final JobExecution jobExecution ) throws Exception {
 
         Map<UUID, String> organizations = getOrgs();
         for ( Map.Entry<UUID, String> organization : organizations.entrySet() ) {
 
-            exportApplicationsForOrg( organization , config );
+            exportApplicationsForOrg( organization, config, jobExecution );
         }
     }
 
 
-
     private Map<UUID, String> getOrgs() throws Exception {
         // Loop through the organizations
-       // TODO:this will come from the orgs in schedule when you do the validations. delete orgId
+        // TODO:this will come from the orgs in schedule when you do the validations. delete orgId
         UUID orgId = null;
 
         Map<UUID, String> organizationNames = null;
-       // managementService.setup();
+        // managementService.setup();
 
 
         if ( orgId == null ) {
@@ -193,47 +196,29 @@ public class ExportServiceImpl implements ExportService{
         this.managementService = managementService;
     }
 
-    public UUID getJobUUID () {
+
+    public UUID getJobUUID() {
         return jobUUID;
     }
-//write test checking to see what happens if the input stream is closed or wrong.
-//TODO: make multipart streaming functional
-    //currently only stores the collection in memory then flushes it.
-    private void exportApplicationsForOrg( Map.Entry<UUID, String> organization,final ExportInfo config ) throws Exception {
-
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        //baos.reset();
-        //ObjectOutputStream oos = new ObjectOutputStream(baos);
-
-//        OutputStreamWriter osw = new OutputStreamWriter(oos,"UTF-8");
-//        PrintWriter out = new PrintWriter( osw );
-//
-//        //oos.reset();
-//
-//        Writer wrtJSon = new OutputStreamWriter( oos, "UTF-8" );
 
 
+    //write test checking to see what happens if the input stream is closed or wrong.
+    //TODO: make multipart streaming functional
+    //currently only stores the collection in memory then flushes it.
+    private void exportApplicationsForOrg( Map.Entry<UUID, String> organization, final ExportInfo config,
+                                           final JobExecution jobExecution ) throws Exception {
 
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
         logger.info( "" + organization );
 
-
         // Loop through the applications per organization
         BiMap<UUID, String> applications = managementService.getApplicationsForOrganization( organization.getKey() );
         for ( Map.Entry<UUID, String> application : applications.entrySet() ) {
 
             logger.info( application.getValue() + " : " + application.getKey() );
 
-            // Get the JSon serializer.
-            //Creates the applications folder
-            /* What needs to be done:
-             * take the file name generator and create one that will only output the collections we need
-              * this will probably icnlude taking both file names, and making sure that it is not doing
-              * two passes as todd had it originally. */
-
-            // JsonGenerator jg = getJsonGenerator( createOutputFile( "application", application.getValue() ) );
-
-            String appFileName =  prepareOutputFileName( "application", application.getValue() );
+            String appFileName = prepareOutputFileName( "application", application.getValue() );
 
             JsonGenerator jg = getJsonGenerator( baos );
 
@@ -243,8 +228,8 @@ public class ExportServiceImpl implements ExportService{
 
             Entity appEntity = rootEm.get( application.getKey() );
 
+            jobExecution.heartbeat();
             Map<String, Object> dictionaries = new HashMap<String, Object>();
-
             for ( String dictionary : rootEm.getDictionaries( appEntity ) ) {
                 Map<Object, Object> dict = rootEm.getDictionaryAsMap( appEntity, dictionary );
 
@@ -273,40 +258,31 @@ public class ExportServiceImpl implements ExportService{
             nsEntity.setMetadata( "counters", entityCounters );
             nsEntity.setMetadata( "collections", collections );
 
+            jobExecution.heartbeat();
             jg.writeStartArray();
             jg.writeObject( nsEntity );
 
-            // Create a GENERATOR for the application collections.
-            //JsonGenerator collectionsJg = getJsonGenerator( createOutputFile( "collections", application.getValue() ) );
-
-            //String collectionsFilename = prepareOutputFileName( "collections","appDummyName" );
-            //JsonGenerator collectionsJg = getJsonGenerator( oos );
-
-
-            //collectionsJg.writeStartObject();
-           // jg.writeStartObject();
             Map<String, Object> metadata = em.getApplicationCollectionMetadata();
-            //don't need to echo as not a command line tool anymore
-            //echo( JsonUtils.mapToFormattedJsonString( metadata ) );
+            long starting_time = System.currentTimeMillis();
 
             // Loop through the collections. This is the only way to loop
             // through the entities in the application (former namespace).
             for ( String collectionName : metadata.keySet() ) {
 
+
                 Query query = new Query();
                 query.setLimit( MAX_ENTITY_FETCH );
                 query.setResultsLevel( Results.Level.ALL_PROPERTIES );
 
                 Results entities = em.searchCollection( em.getApplicationRef(), collectionName, query );
 
-                while ( entities.size() > 0 ) {
 
+                starting_time = checkTimeDelta( starting_time, jobExecution );
+
+                while ( entities.size() > 0 ) {
+                    jobExecution.heartbeat();
                     for ( Entity entity : entities ) {
-                        // Export the entity first and later the collections for
-                        // this entity.
                         jg.writeObject( entity );
-                        //echo( entity );
-
                         saveCollectionMembers( jg, em, application.getValue(), entity );
                     }
 
@@ -324,21 +300,31 @@ public class ExportServiceImpl implements ExportService{
 
             // Close writer and file for this application.
 
-           // logger.warn();
+            // logger.warn();
             jg.writeEndArray();
             jg.close();
-
             baos.flush();
             baos.close();
 
 
-            InputStream is = new ByteArrayInputStream( baos.toByteArray());
-            //InputStream is = new ObjectInputStream );
+            InputStream is = new ByteArrayInputStream( baos.toByteArray() );
             s3Export.copyToS3( is, config );
-            //below line doesn't copy very good data anyways.
         }
     }
 
+
+    public long checkTimeDelta( long startingTime, final JobExecution jobExecution ) {
+
+        long cur_time = System.currentTimeMillis();
+
+        if ( startingTime <= ( cur_time - TIMESTAMP_DELTA ) ) {
+            jobExecution.heartbeat();
+            return cur_time;
+        }
+        return startingTime;
+    }
+
+
     /**
      * Serialize and save the collection members of this <code>entity</code>
      *
@@ -349,6 +335,8 @@ public class ExportServiceImpl implements ExportService{
     private void saveCollectionMembers( JsonGenerator jg, EntityManager em, String application, Entity entity )
             throws Exception {
 
+        long timestamp = System.currentTimeMillis();
+
         Set<String> collections = em.getCollections( entity );
 
         // Only create entry for Entities that have collections
@@ -356,16 +344,14 @@ public class ExportServiceImpl implements ExportService{
             return;
         }
 
-       // jg.writeFieldName( entity.getUuid().toString() );
-        jg.writeStartObject();
 
         for ( String collectionName : collections ) {
 
             jg.writeFieldName( collectionName );
-            // Start collection array.
             jg.writeStartArray();
 
-            Results collectionMembers = em.getCollection( entity, collectionName, null, 100000, Results.Level.IDS, false );
+            Results collectionMembers =
+                    em.getCollection( entity, collectionName, null, 100000, Results.Level.IDS, false );
 
             List<UUID> entityIds = collectionMembers.getIds();
 
@@ -389,11 +375,10 @@ public class ExportServiceImpl implements ExportService{
         jg.writeEndObject();
     }
 
-   // protected JsonGenerator getJsonGenerator( String outFile ) throws IOException {
-   //     return getJsonGenerator( new File( outputDir, outFile ) );
-   // }
 
-    /** Persists the connection for this entity. */
+    /**
+     * Persists the connection for this entity.
+     */
     private void saveDictionaries( Entity entity, EntityManager em, JsonGenerator jg ) throws Exception {
 
         jg.writeFieldName( "dictionaries" );
@@ -424,7 +409,9 @@ public class ExportServiceImpl implements ExportService{
     }
 
 
-    /** Persists the connection for this entity. */
+    /**
+     * Persists the connection for this entity.
+     */
     private void saveConnections( Entity entity, EntityManager em, JsonGenerator jg ) throws Exception {
 
         jg.writeFieldName( "connections" );
@@ -449,7 +436,7 @@ public class ExportServiceImpl implements ExportService{
     }
 
 
-    protected JsonGenerator getJsonGenerator(ByteArrayOutputStream out ) throws IOException {
+    protected JsonGenerator getJsonGenerator( ByteArrayOutputStream out ) throws IOException {
         //TODO:shouldn't the below be UTF-16?
         //PrintWriter out = new PrintWriter( outFile, "UTF-8" );
 
@@ -459,8 +446,9 @@ public class ExportServiceImpl implements ExportService{
         return jg;
     }
 
+
     protected File createOutputFile( String type, String name ) {
-        return new File(prepareOutputFileName( type, name ) );
+        return new File( prepareOutputFileName( type, name ) );
     }
 
 
@@ -492,12 +480,8 @@ public class ExportServiceImpl implements ExportService{
         return outputFileName;
     }
 
+
     @Autowired
     @Override
-    public void setS3Export (S3Export s3Export) { this.s3Export = s3Export; }
-
-
-
-
-
+    public void setS3Export( S3Export s3Export ) { this.s3Export = s3Export; }
 }