You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/02/04 17:54:52 UTC

incubator-usergrid git commit: Added a buffer size to auto flush during processing.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-import ee88ea5ba -> 82efd20c7


Added a buffer size to auto flush during processing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/82efd20c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/82efd20c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/82efd20c

Branch: refs/heads/two-dot-o-import
Commit: 82efd20c7be16c26540de52f672d43ff4d2f9c46
Parents: ee88ea5
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Feb 4 09:54:50 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Feb 4 09:54:50 2015 -0700

----------------------------------------------------------------------
 .../importer/FileImportStatistics.java          | 98 ++++++++++++++++----
 .../importer/FileImportStatisticsTest.java      | 95 +++++++++++++++++--
 2 files changed, 167 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/82efd20c/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportStatistics.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportStatistics.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportStatistics.java
index 739c709..f8cabcd 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportStatistics.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportStatistics.java
@@ -21,6 +21,8 @@ package org.apache.usergrid.management.importer;
 
 
 import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.usergrid.persistence.EntityManager;
@@ -32,8 +34,8 @@ import org.apache.usergrid.persistence.exceptions.PersistenceException;
 
 /**
  * Statistics used to track a file import. Only 1 instance of this class should exist per file imported in the cluster.
- * There is a direct 1-1 mapping of the statistics provided here and the file import status.
- * This class is threadsafe to be used across multiple threads.
+ * There is a direct 1-1 mapping of the statistics provided here and the file import status. This class is threadsafe to
+ * be used across multiple threads.
  */
 public class FileImportStatistics {
 
@@ -42,15 +44,26 @@ public class FileImportStatistics {
 
     private final AtomicLong entitiesWritten = new AtomicLong( 0 );
     private final AtomicLong entitiesFailed = new AtomicLong( 0 );
+    private final AtomicInteger cachedOperations = new AtomicInteger( 0 );
 
+    private final Semaphore writeSemaphore = new Semaphore( 1 );
 
     private final FileImport fileImport;
     private final EntityManager entityManager;
+    private final int flushCount;
 
 
-    public FileImportStatistics( final UUID fileImportId, final EntityManager entityManager ) {
+    /**
+     * Create an instance to track counters
+     *
+     * @param entityManager The entity manager that will hold these entities.
+     * @param fileImportId The uuid of the fileImport
+     * @param flushCount The number of success + failures to accumulate before flushing
+     */
+    public FileImportStatistics( final EntityManager entityManager, final UUID fileImportId, final int flushCount ) {
         this.entityManager = entityManager;
-        this.fileImport = getFileImport( fileImportId);
+        this.flushCount = flushCount;
+        this.fileImport = getFileImport( fileImportId );
     }
 
 
@@ -59,6 +72,7 @@ public class FileImportStatistics {
      */
     public void entityWritten() {
         entitiesWritten.incrementAndGet();
+        maybeFlush();
     }
 
 
@@ -66,7 +80,7 @@ public class FileImportStatistics {
      * Invoke when an entity fails to write correctly
      */
 
-    public void entityFailed(final String message ) {
+    public void entityFailed( final String message ) {
         entitiesFailed.incrementAndGet();
 
 
@@ -77,9 +91,10 @@ public class FileImportStatistics {
             failedEntityImport = entityManager.create( failedEntityImport );
             entityManager.createConnection( fileImport, ERRORS_CONNECTION_NAME, failedEntityImport );
         }
-        catch(Exception e){
+        catch ( Exception e ) {
             throw new PersistenceException( "Unable to save failed entity import message", e );
         }
+        maybeFlush();
     }
 
 
@@ -119,6 +134,60 @@ public class FileImportStatistics {
 
 
     /**
+     * Return the total number of successful imports + failed imports.  Can be used in resume.
+     * Note that this reflects the counts last written to cassandra, NOT the current state in memory
+     * @return
+     */
+    public long getParsedCount(){
+        final FileImport saved  = getFileImport( fileImport.getUuid() );
+
+        return saved.getFailedEntityCount() + saved.getImportedEntityCount();
+    }
+
+
+    /**
+     * Returns true if we should stop processing.  This will use the following logic
+     *
+     * We've attempted to import over 1k entities After 1k, we have over a 50% failure rate
+     */
+    public boolean shouldStopProcessing() {
+
+        //TODO Dave, George.  What algorithm should we use here?
+        return false;
+    }
+
+
+    private void maybeFlush() {
+        final int count = cachedOperations.incrementAndGet();
+
+        //no op
+        if ( count < flushCount ) {
+            return;
+        }
+
+        //another thread is writing, no op, just return
+        if ( !writeSemaphore.tryAcquire() ) {
+            return;
+        }
+
+        final long failed = entitiesFailed.get();
+        final long written = entitiesWritten.get();
+        final String message;
+
+        if ( failed > 0 ) {
+            message = "Failed to import " + failed + " entities.  Successfully imported " + written + " entities";
+        }
+        else {
+            message = "Successfully imported " + written + " entities";
+        }
+
+        updateFileImport( written, failed, FileImport.State.STARTED, message );
+        cachedOperations.addAndGet( flushCount * -1 );
+        writeSemaphore.release();
+    }
+
+
+    /**
      * Update the file import status with the provided messages
      *
      * @param written The number of files written
@@ -146,16 +215,15 @@ public class FileImportStatistics {
 
     /**
      * Get the FileImport by uuid and return it
-     * @param fileImportId
-     * @return
+     *
      * @throws EntityNotFoundException if we can't find the file import with the given uuid
      */
     private FileImport getFileImport( final UUID fileImportId ) {
 
-       final FileImport fileImport;
+        final FileImport fileImport;
 
         try {
-            fileImport =  entityManager.get( fileImportId, FileImport.class );
+            fileImport = entityManager.get( fileImportId, FileImport.class );
         }
         catch ( Exception e ) {
             throw new RuntimeException( "Unable to load fileImport with id " + fileImportId, e );
@@ -169,14 +237,4 @@ public class FileImportStatistics {
     }
 
 
-    /**
-     * Returns true if we should stop processing.  This will use the following logic
-     *
-     * We've attempted to import over 1k entities After 1k, we have over a 50% failure rate
-     */
-    public boolean stopProcessing() {
-
-        //TODO Dave, George.  What algorithm should we use here?
-        return false;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/82efd20c/stack/services/src/test/java/org/apache/usergrid/management/importer/FileImportStatisticsTest.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/FileImportStatisticsTest.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/FileImportStatisticsTest.java
index e2040fd..15f9ed8 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/FileImportStatisticsTest.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/FileImportStatisticsTest.java
@@ -61,7 +61,7 @@ public class FileImportStatisticsTest {
         when( em.get( importFileId, FileImport.class ) ).thenReturn( fileImport );
 
 
-        final FileImportStatistics fileImportStatistics = new FileImportStatistics( importFileId, em );
+        final FileImportStatistics fileImportStatistics = new FileImportStatistics( em, importFileId, 1000 );
 
         final long expectedCount = 100;
 
@@ -110,7 +110,7 @@ public class FileImportStatisticsTest {
             }
         } );
 
-        final FileImportStatistics fileImportStatistics = new FileImportStatistics( importFileId, em );
+        final FileImportStatistics fileImportStatistics = new FileImportStatistics( em, importFileId, 1000 );
 
         final long expectedSuccess = 100;
 
@@ -181,7 +181,7 @@ public class FileImportStatisticsTest {
         when( em.get( importFileId, FileImport.class ) ).thenReturn( fileImport );
 
 
-        final FileImportStatistics fileImportStatistics = new FileImportStatistics( importFileId, em );
+        final FileImportStatistics fileImportStatistics = new FileImportStatistics( em, importFileId, 1000 );
 
         final long expectedCount = 100;
 
@@ -204,11 +204,94 @@ public class FileImportStatisticsTest {
 
         assertEquals( "Same count expected", expectedCount, updated.getImportedEntityCount() );
 
-        assertEquals("Fail count is 0", 0, updated.getFailedEntityCount());
+        assertEquals( "Fail count is 0", 0, updated.getFailedEntityCount() );
 
-        assertEquals("Correct expected message", "Something bad happened", updated.getErrorMessage());
+        assertEquals( "Correct expected message", "Something bad happened", updated.getErrorMessage() );
 
-        assertEquals("Expected failed state", FileImport.State.FAILED, updated.getState());
+        assertEquals( "Expected failed state", FileImport.State.FAILED, updated.getState() );
+    }
+
+
+    @Test
+    public void testAutoFlushSuccess() throws Exception {
+
+        final EntityManager em = mock( EntityManager.class );
+
+        final UUID importFileId = UUIDGenerator.newTimeUUID();
+
+
+        final FileImport fileImport = new FileImport();
+        fileImport.setUuid( importFileId );
+
+        when( em.get( importFileId, FileImport.class ) ).thenReturn( fileImport );
+
+        //mock up returning the FailedEntityImport instance after save is invoked.
+
+        when( em.create( any( FailedEntityImport.class ) ) ).thenAnswer( new Answer<FailedEntityImport>() {
+            @Override
+            public FailedEntityImport answer( final InvocationOnMock invocation ) throws Throwable {
+                return ( FailedEntityImport ) invocation.getArguments()[0];
+            }
+        } );
+
+        final int expectedSuccess = 100;
+        final int expectedFails = 100;
+        final int expectedFlushCount = 2;
+        final int flushSize = ( expectedFails + expectedFails ) / expectedFlushCount;
+
+        //set this to 1/2, so that we get saved twice
+        final FileImportStatistics fileImportStatistics = new FileImportStatistics( em, importFileId, flushSize );
+
+
+        for ( long i = 0; i < expectedSuccess; i++ ) {
+            fileImportStatistics.entityWritten();
+        }
+
+
+        for ( int i = 0; i < expectedFails; i++ ) {
+            fileImportStatistics.entityFailed( "Failed to write entity " + i );
+        }
+
+
+        fileImportStatistics.complete();
+
+
+        ArgumentCaptor<FileImport> savedFileImport = ArgumentCaptor.forClass( FileImport.class );
+
+        verify( em, times( expectedFlushCount + 1 ) ).update( savedFileImport.capture() );
+
+        final FileImport updated = savedFileImport.getAllValues().get( 2 );
+
+        assertSame( "Same instance should be updated", fileImport, updated );
+
+        assertEquals( "Same count expected", expectedSuccess, updated.getImportedEntityCount() );
+
+        assertEquals( "Same fail expected", expectedFails, updated.getFailedEntityCount() );
+
+        assertEquals( "Correct error message",
+            "Failed to import " + expectedFails + " entities.  Successfully imported " + expectedSuccess + " entities",
+            updated.getErrorMessage() );
+
+        //TODO get the connections from the file import
+
+        ArgumentCaptor<FailedEntityImport> failedEntities = ArgumentCaptor.forClass( FailedEntityImport.class );
+
+        verify( em, times( expectedFails ) )
+            .createConnection( same( fileImport ), eq( "errors" ), failedEntities.capture() );
+
+        //now check all our arguments
+
+        final List<FailedEntityImport> args = failedEntities.getAllValues();
+
+        assertEquals( "Same number of error connections created", expectedFails, args.size() );
+
+
+        for ( int i = 0; i < expectedFails; i++ ) {
+
+            final FailedEntityImport failedImport = args.get( i );
+
+            assertEquals( "Same message expected", "Failed to write entity " + i, failedImport.getErrorMessage() );
+        }
     }
 }