You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/12 23:28:22 UTC

[03/27] incubator-usergrid git commit: Created a multi process barrier and lock utility. This will be used in our listeners to initialize and clean Cassandra.

Created a multi process barrier and lock utility.  This will be used in our listeners to initialize and clean Cassandra.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/938855f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/938855f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/938855f8

Branch: refs/heads/USERGRID-365
Commit: 938855f8b9c060a8b325d362f91679a0f421012e
Parents: 61a81d5
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Dec 12 14:13:59 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Dec 12 14:13:59 2014 -0700

----------------------------------------------------------------------
 .../usergrid/lock/MultiProcessBarrier.java      |  77 +++++++++++
 .../usergrid/lock/MultiProcessLocalLock.java    | 117 ++++++++++++++++
 .../usergrid/lock/MultiProcessBarrierTest.java  | 136 +++++++++++++++++++
 .../lock/MultiProcessLocalLockTest.java         | 103 ++++++++++++++
 4 files changed, 433 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/938855f8/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessBarrier.java
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessBarrier.java b/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessBarrier.java
new file mode 100644
index 0000000..cd5a1de
--- /dev/null
+++ b/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessBarrier.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.lock;
+
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+
+/**
+ * A barrier between processes and threads. Everyone will await until proceed has been invoked by a single
+ * thread.  Other threads will proceed after wait time.
+ */
+public class MultiProcessBarrier {
+
+    /**
+     * The sleep time to wait before checking.
+     */
+    private static final long SLEEP_TIME = 100;
+    public final File barrierFile;
+
+
+    public MultiProcessBarrier( final String barrierFileName ) {
+        this.barrierFile = new File( barrierFileName );
+    }
+
+
+    /**
+     * Notify the other processes they can proceed.
+     */
+    public void proceed() throws IOException {
+        barrierFile.mkdirs();
+        barrierFile.createNewFile();
+    }
+
+
+    /**
+     * Await the specified file.  If it exists, it will proceed
+     * @param timeout
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    public void await(final long timeout) throws InterruptedException, TimeoutException {
+
+        final long stopTime = System.currentTimeMillis() + timeout;
+
+        while(System.currentTimeMillis() < stopTime){
+
+            //barrier is done break
+            if(barrierFile.exists()){
+                return;
+            }
+
+            Thread.sleep( SLEEP_TIME );
+        }
+
+        throw new TimeoutException( "Timeout out after " + timeout + " milliseconds waiting for the file" );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/938855f8/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessLocalLock.java
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessLocalLock.java b/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessLocalLock.java
new file mode 100644
index 0000000..ba059c3
--- /dev/null
+++ b/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessLocalLock.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.lock;
+
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
+
+
+/**
+ * A lock that will work across multiple processes and threads on the same machine. This uses The file system to attempt
+ * to obtain a lock.  No blocking is performed, the lock is either successful or fails
+ */
+public class MultiProcessLocalLock {
+
+    private final String fileName;
+    private FileLock lock;
+
+
+    /**
+     * The filename to use as the lock
+     */
+    public MultiProcessLocalLock( final String fileName ) {
+        this.fileName = fileName;
+    }
+
+
+    /**
+     * Attempts to lock the file.  If a lock cannot be acquired, false is returned.  Otherwise, true is returned.
+     *
+     * @return true if the lock was acquired.  False otherwise.
+     */
+    public boolean tryLock() throws IOException {
+
+        if ( lock != null ) {
+            throw new IllegalStateException( "You already have a lock, you cannot get a lock again" );
+        }
+
+        File file = new File( fileName );
+
+        if ( !file.exists() ) {
+            file.createNewFile();
+        }
+
+        // get a file channel
+        FileChannel fileChannel = new RandomAccessFile( file, "rw" ).getChannel();
+
+        try {
+            lock = fileChannel.tryLock();
+        }
+        //we don't have the lock, ignore
+        catch(OverlappingFileLockException ofle){
+            return false;
+        }
+
+
+        return hasLock();
+    }
+
+
+    /**
+     * Release the lock if we hold it.
+     */
+    public void releaseLock() throws IOException {
+        if ( lock == null ) {
+            throw new IllegalStateException( "You cannot release a lock you do not have" );
+        }
+
+
+        lock.release();
+
+        lock = null;
+    }
+
+
+    /**
+     * Return true if this instance has the lock
+     * @return
+     */
+    public boolean hasLock(){
+        return lock != null;
+    }
+
+    /**
+     * Releases the lock if we have it, otherwise is a no-op
+     * @return
+     */
+    public void maybeReleaseLock() throws IOException {
+
+        if(lock == null){
+            return;
+        }
+
+        releaseLock();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/938855f8/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessBarrierTest.java
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessBarrierTest.java b/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessBarrierTest.java
new file mode 100644
index 0000000..3fe0e57
--- /dev/null
+++ b/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessBarrierTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.lock;
+
+
+import java.io.File;
+import java.io.IOException;
+import java.security.AccessController;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Test;
+
+import sun.security.action.GetPropertyAction;
+
+import static org.apache.usergrid.TestHelper.newUUIDString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+/**
+ * A test that tests our multiprocesses play nice across threads
+ */
+public class MultiProcessBarrierTest {
+
+
+    @Test
+    public void singleBarrierTest() throws IOException, InterruptedException, TimeoutException {
+        final String file = newFileName();
+
+        final MultiProcessBarrier barrier = new MultiProcessBarrier( file );
+
+
+        try {
+            barrier.await( 500 );
+            fail( "I should timeout" );
+        }
+        catch ( TimeoutException te ) {
+            //swallow, should timeout
+        }
+
+        //now proceed then away
+        barrier.proceed();
+        barrier.await( 100 );
+    }
+
+
+    @Test
+    public void barrierTest() throws IOException, InterruptedException {
+        final String file = newFileName();
+
+        //create 2 threads
+        final CountDownLatch latch = new CountDownLatch( 2 );
+
+        //create 2 worker threads.  We need to run them, and ensure that they don't countdown.
+
+        new BarrierThread( file, latch ).start();
+        new BarrierThread( file, latch ).start();
+
+        assertEquals(2, latch.getCount());
+
+        //now create the barrier and execute it
+
+        MultiProcessBarrier barrier = new MultiProcessBarrier( file );
+        barrier.proceed();
+
+        assertTrue( "other barriers proceeded", latch.await( 1000, TimeUnit.MILLISECONDS ) );
+    }
+
+
+
+
+
+
+    /**
+     * Generate and delt
+     */
+    private String newFileName() throws IOException {
+        final File tmpdir = new File( AccessController.doPrivileged( new GetPropertyAction( "java.io.tmpdir" ) ) );
+
+        return tmpdir.getAbsoluteFile().toString() + "/" + newUUIDString();
+    }
+
+
+    /**
+     * A simple inner thread that tests we block until proceeding
+     */
+    private final class BarrierThread extends Thread{
+
+        private final String fileName;
+
+        private final CountDownLatch completeLatch;
+
+
+        private BarrierThread( final String fileName, final CountDownLatch completeLatch) {
+            this.fileName = fileName;
+            this.completeLatch = completeLatch;
+        }
+
+
+        @Override
+        public void run() {
+
+            MultiProcessBarrier barrier = new MultiProcessBarrier( fileName );
+
+
+            try {
+                barrier.await( 10000 );
+            }
+            catch ( Exception e ) {
+                throw new RuntimeException( e );
+            }
+
+            completeLatch.countDown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/938855f8/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessLocalLockTest.java
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessLocalLockTest.java b/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessLocalLockTest.java
new file mode 100644
index 0000000..44c8e20
--- /dev/null
+++ b/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessLocalLockTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.lock;
+
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import static org.apache.usergrid.TestHelper.newUUIDString;
+import static org.junit.Assert.*;
+
+
+/**
+ * Simple test for multiple process lock
+ */
+public class MultiProcessLocalLockTest {
+
+    /**
+     * Create and verify the single lock
+     * @throws IOException
+     */
+    @Test
+    public void singleLock() throws IOException {
+
+        final String lockName = newFileName();
+
+        MultiProcessLocalLock lock = new MultiProcessLocalLock( lockName);
+
+        assertTrue(lock.tryLock());
+
+        assertTrue(lock.hasLock());
+
+        lock.maybeReleaseLock();
+
+    }
+
+    /**
+       * Create and verify the single lock
+       * @throws IOException
+       */
+      @Test
+      public void multiLock() throws IOException {
+
+          final String lockName = newFileName();
+
+          MultiProcessLocalLock lock1 = new MultiProcessLocalLock( lockName );
+
+          assertTrue( lock1.tryLock() );
+
+          assertTrue( lock1.hasLock() );
+
+
+          //get lock 2, should fail
+          MultiProcessLocalLock lock2 = new MultiProcessLocalLock( lockName );
+
+          assertFalse(lock2.tryLock());
+
+          assertFalse(lock2.hasLock());
+
+
+
+          //release lock1
+          lock1.maybeReleaseLock();
+
+
+          //should succeed
+          assertTrue(lock2.tryLock());
+
+          assertTrue(lock2.hasLock());
+
+          assertFalse(lock1.tryLock());
+
+          assertFalse(lock1.hasLock());
+
+          lock2.maybeReleaseLock();
+
+      }
+
+    private String newFileName() throws IOException {
+        return File.createTempFile( "test", "" ).getAbsolutePath();
+    }
+}
+
+