You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/12 23:28:27 UTC

[08/27] incubator-usergrid git commit: Changed barriers to sockets

Changed barriers to sockets


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d2e5da60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d2e5da60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d2e5da60

Branch: refs/heads/USERGRID-365
Commit: d2e5da60f95eac0f14861b72ba7817fd37eee568
Parents: 5aca048
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Jan 20 14:55:17 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Jan 20 14:55:17 2015 -0700

----------------------------------------------------------------------
 .../usergrid/lock/MultiProcessBarrier.java      | 54 +++++++++++++-------
 .../usergrid/lock/MultiProcessLocalLock.java    | 51 ++++++++----------
 .../usergrid/setup/SpringIntegrationRunner.java | 39 +++-----------
 .../org/apache/usergrid/setup/SystemSetup.java  | 30 +++--------
 .../usergrid/lock/MultiProcessBarrierTest.java  | 10 ++--
 .../lock/MultiProcessLocalLockTest.java         | 27 +++++++---
 6 files changed, 99 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d2e5da60/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessBarrier.java
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessBarrier.java b/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessBarrier.java
index cd5a1de..7b4e290 100644
--- a/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessBarrier.java
+++ b/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessBarrier.java
@@ -20,26 +20,29 @@
 package org.apache.usergrid.lock;
 
 
-import java.io.File;
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
 import java.util.concurrent.TimeoutException;
 
 
 /**
- * A barrier between processes and threads. Everyone will await until proceed has been invoked by a single
- * thread.  Other threads will proceed after wait time.
+ * A barrier between processes and threads. Everyone will await until proceed has been invoked by a single thread.
+ * Other threads will proceed after wait time.
  */
 public class MultiProcessBarrier {
 
     /**
      * The sleep time to wait before checking.
      */
-    private static final long SLEEP_TIME = 100;
-    public final File barrierFile;
+    private static final int SLEEP_TIME = 100;
+    public final int barrierPort;
+    public ServerSocket serverSocket;
 
 
-    public MultiProcessBarrier( final String barrierFileName ) {
-        this.barrierFile = new File( barrierFileName );
+    public MultiProcessBarrier( final int barrierPort ) {
+        this.barrierPort = barrierPort;
     }
 
 
@@ -47,31 +50,46 @@ public class MultiProcessBarrier {
      * Notify the other processes they can proceed.
      */
     public void proceed() throws IOException {
-        barrierFile.mkdirs();
-        barrierFile.createNewFile();
+        serverSocket = new ServerSocket( barrierPort );
     }
 
 
     /**
      * Await the specified file.  If it exists, it will proceed
-     * @param timeout
-     * @throws InterruptedException
-     * @throws TimeoutException
      */
-    public void await(final long timeout) throws InterruptedException, TimeoutException {
+    public void await( final long timeout ) throws InterruptedException, TimeoutException {
 
         final long stopTime = System.currentTimeMillis() + timeout;
 
-        while(System.currentTimeMillis() < stopTime){
+        while ( System.currentTimeMillis() < stopTime ) {
+
+
+            try {
+                Socket client = new Socket();
+                client.connect( new InetSocketAddress( "127.0.0.1", barrierPort ), SLEEP_TIME );
+                //if we get here we're good, the client can connect and close
+                client.close();
+
+                finalize();
 
-            //barrier is done break
-            if(barrierFile.exists()){
                 return;
             }
-
-            Thread.sleep( SLEEP_TIME );
+            catch ( IOException e ) {
+                //not open swallow and retry
+            }
+            catch ( Throwable throwable ) {
+                throw new RuntimeException( "Something unexpected happened", throwable );
+            }
         }
 
         throw new TimeoutException( "Timeout out after " + timeout + " milliseconds waiting for the file" );
     }
+
+
+    @Override
+    protected void finalize() throws Throwable {
+        if ( serverSocket != null && !serverSocket.isClosed() ) {
+            serverSocket.close();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d2e5da60/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessLocalLock.java
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessLocalLock.java b/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessLocalLock.java
index 304ed7a..c109f2c 100644
--- a/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessLocalLock.java
+++ b/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessLocalLock.java
@@ -20,12 +20,8 @@
 package org.apache.usergrid.lock;
 
 
-import java.io.File;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.nio.channels.OverlappingFileLockException;
+import java.net.ServerSocket;
 
 
 /**
@@ -34,15 +30,15 @@ import java.nio.channels.OverlappingFileLockException;
  */
 public class MultiProcessLocalLock {
 
-    private final String fileName;
-    private FileLock lock;
+    private final int socketNumber;
+    private ServerSocket lock;
 
 
     /**
      * The filename to use as the lock
      */
-    public MultiProcessLocalLock( final String fileName ) {
-        this.fileName = fileName;
+    public MultiProcessLocalLock( final int socketNumber ) {
+        this.socketNumber = socketNumber;
     }
 
 
@@ -57,26 +53,16 @@ public class MultiProcessLocalLock {
             throw new IllegalStateException( "You already have a lock, you cannot get a lock again" );
         }
 
-        File file = new File( fileName );
-
-        if ( !file.exists() ) {
-            file.getParentFile().mkdirs();
-            file.createNewFile();
-        }
-
-        // get a file channel
-        FileChannel fileChannel = new RandomAccessFile( file, "rw" ).getChannel();
-
         try {
-            lock = fileChannel.tryLock();
+            lock = new ServerSocket( socketNumber );
         }
-        //we don't have the lock, ignore
-        catch(OverlappingFileLockException ofle){
+        catch ( IOException ioe ) {
+            //swallow, we didn't get the lock
             return false;
         }
 
 
-        return hasLock();
+        return true;
     }
 
 
@@ -89,7 +75,7 @@ public class MultiProcessLocalLock {
         }
 
 
-        lock.release();
+        lock.close();
 
         lock = null;
     }
@@ -97,22 +83,25 @@ public class MultiProcessLocalLock {
 
     /**
      * Return true if this instance has the lock
-     * @return
      */
-    public boolean hasLock(){
+    public boolean hasLock() {
         return lock != null;
     }
 
+
     /**
-     * Releases the lock if we have it, otherwise is a no-op
-     * @return
+     * Releases the lock if we have it, otherwise is a no-op.
+     *
+     * @return true if we had the lock and released it.  False if we didn't have the lock
      */
-    public void maybeReleaseLock() throws IOException {
+    public boolean maybeReleaseLock() throws IOException {
 
-        if(lock == null){
-            return;
+        if ( lock == null ) {
+            return false;
         }
 
         releaseLock();
+
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d2e5da60/stack/test-utils/src/main/java/org/apache/usergrid/setup/SpringIntegrationRunner.java
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/main/java/org/apache/usergrid/setup/SpringIntegrationRunner.java b/stack/test-utils/src/main/java/org/apache/usergrid/setup/SpringIntegrationRunner.java
index d9fd2cc..a651fe3 100644
--- a/stack/test-utils/src/main/java/org/apache/usergrid/setup/SpringIntegrationRunner.java
+++ b/stack/test-utils/src/main/java/org/apache/usergrid/setup/SpringIntegrationRunner.java
@@ -20,15 +20,9 @@
 package org.apache.usergrid.setup;
 
 
-import org.junit.ClassRule;
 import org.junit.runners.BlockJUnit4ClassRunner;
 import org.junit.runners.model.InitializationError;
 import org.junit.runners.model.Statement;
-import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-
-import org.apache.usergrid.cassandra.CassandraResource;
-import org.apache.usergrid.cassandra.SpringResource;
-import org.apache.usergrid.persistence.index.impl.ElasticSearchResource;
 
 
 /**
@@ -37,10 +31,7 @@ import org.apache.usergrid.persistence.index.impl.ElasticSearchResource;
 public class SpringIntegrationRunner extends BlockJUnit4ClassRunner {
 
 
-
-    private static boolean initialized;
-
-
+    private static SystemSetup setup;
 
 
     /**
@@ -54,37 +45,23 @@ public class SpringIntegrationRunner extends BlockJUnit4ClassRunner {
     @Override
     protected Statement withBeforeClasses( final Statement statement ) {
 
-        if(!initialized){
-            runSetup();
-        }
-
-
-        final Statement toReturn =  super.withBeforeClasses( statement );
-
-        return toReturn;
-
+        singletonSetup();
+        return super.withBeforeClasses( statement );
     }
 
 
-    /**
-     * Run the setup once per JVM
-     */
-    public static synchronized void runSetup() {
+    private static synchronized void singletonSetup() {
 
-        if(initialized){
+        if ( setup != null ) {
             return;
         }
 
+        setup = new SystemSetup();
         try {
-            new SystemSetup().maybeInitialize();
+            setup.maybeInitialize();
         }
         catch ( Exception e ) {
-            throw new RuntimeException( "Unable to initialize the system", e );
+            throw new RuntimeException( "Unable to start subsystem" );
         }
-
-
-        initialized = true;
-
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d2e5da60/stack/test-utils/src/main/java/org/apache/usergrid/setup/SystemSetup.java
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/main/java/org/apache/usergrid/setup/SystemSetup.java b/stack/test-utils/src/main/java/org/apache/usergrid/setup/SystemSetup.java
index 0681c25..9460365 100644
--- a/stack/test-utils/src/main/java/org/apache/usergrid/setup/SystemSetup.java
+++ b/stack/test-utils/src/main/java/org/apache/usergrid/setup/SystemSetup.java
@@ -37,15 +37,15 @@ import org.apache.usergrid.persistence.index.impl.ElasticSearchResource;
  */
 public class SystemSetup {
 
-    private static final String TEMP_FILE_PATH = "target/surefirelocks";
-    public static final String LOCK_NAME = TEMP_FILE_PATH + "/lock";
-    public static final String START_BARRIER_NAME = TEMP_FILE_PATH + "/start_barrier";
+
+    public static final int LOCK_PORT = Integer.parseInt( System.getProperty( "test.lock.port", "10101") );
+    public static final int START_BARRIER_PORT =  Integer.parseInt( System.getProperty( "test.barrier.port", "10102") );
 
 
     public static final long ONE_MINUTE = 60000;
 
-    final MultiProcessLocalLock lock = new MultiProcessLocalLock( LOCK_NAME );
-    final MultiProcessBarrier barrier = new MultiProcessBarrier( START_BARRIER_NAME );
+    final MultiProcessLocalLock lock = new MultiProcessLocalLock( LOCK_PORT );
+    final MultiProcessBarrier barrier = new MultiProcessBarrier( START_BARRIER_PORT );
 
 
     /**
@@ -65,6 +65,7 @@ public class SystemSetup {
 
             SpringResource.getInstance().migrate();
 
+
             //signal to other processes we've migrated, and they can proceed
             barrier.proceed();
         }
@@ -72,24 +73,7 @@ public class SystemSetup {
 
         barrier.await( ONE_MINUTE );
 
-        //it doesn't matter who finishes first.  We need to remove the resources so we start correctly next time.
-        //not ideal, but a clean will solve this issue too
-        if ( lock.hasLock() ) {
-
-            //add a shutdown hook so we clean up after ourselves.  Kinda fugly, but works since we can't clean on start
-            Runtime.getRuntime().addShutdownHook( new Thread() {
-
-                public void run() {
-
-                    System.out.println( "Shutdown Hook is running !" );
-                    deleteFile( LOCK_NAME );
-                    deleteFile( START_BARRIER_NAME );
-                }
-            } );
-
-
-            lock.releaseLock();
-        }
+        lock.maybeReleaseLock();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d2e5da60/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessBarrierTest.java
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessBarrierTest.java b/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessBarrierTest.java
index 3fe0e57..5863041 100644
--- a/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessBarrierTest.java
+++ b/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessBarrierTest.java
@@ -43,11 +43,15 @@ import static org.junit.Assert.fail;
 public class MultiProcessBarrierTest {
 
 
+    public static final int START_BARRIER_PORT =  Integer.parseInt( System.getProperty( "test.barrier.port", "10102") );
+
+
+
     @Test
     public void singleBarrierTest() throws IOException, InterruptedException, TimeoutException {
         final String file = newFileName();
 
-        final MultiProcessBarrier barrier = new MultiProcessBarrier( file );
+        final MultiProcessBarrier barrier = new MultiProcessBarrier( START_BARRIER_PORT );
 
 
         try {
@@ -80,7 +84,7 @@ public class MultiProcessBarrierTest {
 
         //now create the barrier and execute it
 
-        MultiProcessBarrier barrier = new MultiProcessBarrier( file );
+        MultiProcessBarrier barrier = new MultiProcessBarrier( START_BARRIER_PORT );
         barrier.proceed();
 
         assertTrue( "other barriers proceeded", latch.await( 1000, TimeUnit.MILLISECONDS ) );
@@ -120,7 +124,7 @@ public class MultiProcessBarrierTest {
         @Override
         public void run() {
 
-            MultiProcessBarrier barrier = new MultiProcessBarrier( fileName );
+            MultiProcessBarrier barrier = new MultiProcessBarrier( START_BARRIER_PORT );
 
 
             try {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d2e5da60/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessLocalLockTest.java
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessLocalLockTest.java b/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessLocalLockTest.java
index 44c8e20..1f3663f 100644
--- a/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessLocalLockTest.java
+++ b/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessLocalLockTest.java
@@ -34,6 +34,8 @@ import static org.junit.Assert.*;
  */
 public class MultiProcessLocalLockTest {
 
+    public static final int LOCK_PORT = Integer.parseInt( System.getProperty( "test.lock.port", "10101") );
+
     /**
      * Create and verify the single lock
      * @throws IOException
@@ -43,7 +45,7 @@ public class MultiProcessLocalLockTest {
 
         final String lockName = newFileName();
 
-        MultiProcessLocalLock lock = new MultiProcessLocalLock( lockName);
+        MultiProcessLocalLock lock = new MultiProcessLocalLock( LOCK_PORT);
 
         assertTrue(lock.tryLock());
 
@@ -62,7 +64,7 @@ public class MultiProcessLocalLockTest {
 
           final String lockName = newFileName();
 
-          MultiProcessLocalLock lock1 = new MultiProcessLocalLock( lockName );
+          MultiProcessLocalLock lock1 = new MultiProcessLocalLock( LOCK_PORT );
 
           assertTrue( lock1.tryLock() );
 
@@ -70,16 +72,23 @@ public class MultiProcessLocalLockTest {
 
 
           //get lock 2, should fail
-          MultiProcessLocalLock lock2 = new MultiProcessLocalLock( lockName );
+          MultiProcessLocalLock lock2 = new MultiProcessLocalLock( LOCK_PORT );
 
           assertFalse(lock2.tryLock());
 
           assertFalse(lock2.hasLock());
 
 
-
           //release lock1
-          lock1.maybeReleaseLock();
+          boolean lock1release = lock1.maybeReleaseLock();
+
+          assertTrue( "lock released", lock1release );
+
+          boolean lock2release = lock2.maybeReleaseLock();
+
+          assertFalse( "lock released", lock2release );
+
+
 
 
           //should succeed
@@ -91,7 +100,13 @@ public class MultiProcessLocalLockTest {
 
           assertFalse(lock1.hasLock());
 
-          lock2.maybeReleaseLock();
+          lock1release = lock1.maybeReleaseLock();
+
+          assertFalse( "lock released", lock1release );
+
+          lock2release = lock2.maybeReleaseLock();
+
+          assertTrue( "lock released", lock2release );
 
       }