You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pa...@apache.org on 2016/01/14 21:55:53 UTC

[02/20] storm git commit: Fixing DirLock. Additional tests for it. Due to problem in hadoop 2.6.0 with concurrent file create, upgrading to hadoop 2.6.1.

Fixing DirLock.  Additional tests for it.  Due to problem in hadoop 2.6.0 with concurrent file create, upgrading to hadoop 2.6.1.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2fb0d7d9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2fb0d7d9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2fb0d7d9

Branch: refs/heads/master
Commit: 2fb0d7d980c4f8c328905249b3ff5ed5e64c1558
Parents: 5793cdd
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Thu Dec 10 18:01:20 2015 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Thu Jan 14 11:34:55 2016 -0800

----------------------------------------------------------------------
 .../org/apache/storm/hdfs/spout/DirLock.java    | 21 ++++--
 .../apache/storm/hdfs/spout/TestDirLock.java    | 68 ++++++++++++++------
 pom.xml                                         |  4 +-
 3 files changed, 69 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2fb0d7d9/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
index ef02a8f..304f26d 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
@@ -19,13 +19,15 @@
 package org.apache.storm.hdfs.spout;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.ipc.RemoteException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 
 public class DirLock {
   private FileSystem fs;
@@ -43,26 +45,37 @@ public class DirLock {
    *
    * @param fs
    * @param dir  the dir on which to get a lock
-   * @return lock object
+   * @return The lock object if it the lock was acquired. Returns null if the dir is already locked.
    * @throws IOException if there were errors
    */
   public static DirLock tryLock(FileSystem fs, Path dir) throws IOException {
     Path lockFile = new Path(dir.toString() + Path.SEPARATOR_CHAR + DIR_LOCK_FILE );
     try {
       FSDataOutputStream os = fs.create(lockFile, false);
-      if(log.isInfoEnabled()) {
-        log.info("Thread acquired dir lock  " + threadInfo() + " - lockfile " + lockFile);
+      if (log.isInfoEnabled()) {
+        log.info("Thread ({}) acquired lock on dir {}", threadInfo(), dir);
       }
       os.close();
       return new DirLock(fs, lockFile);
     } catch (FileAlreadyExistsException e) {
+      log.info("Thread ({}) cannot lock dir {} as its already locked.", threadInfo(), dir);
       return null;
+    } catch (RemoteException e) {
+      if( e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName()) ) {
+        log.info("Thread ({}) cannot lock dir {} as its already locked.", threadInfo(), dir);
+        return null;
+      } else { // unexpected error
+        log.error("Error when acquiring lock on dir " + dir, e);
+        throw e;
+      }
     }
   }
 
   private static String threadInfo () {
     return "ThdId=" + Thread.currentThread().getId() + ", ThdName=" + Thread.currentThread().getName();
   }
+
+  /** Release lock on dir by deleting the lock file */
   public void release() throws IOException {
     fs.delete(lockFile, false);
     log.info("Thread {} released dir lock {} ", threadInfo(), lockFile);

http://git-wip-us.apache.org/repos/asf/storm/blob/2fb0d7d9/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
index 9686fd8..fcfe704 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
@@ -20,6 +20,7 @@ package org.apache.storm.hdfs.spout;
 
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -44,7 +45,7 @@ public class TestDirLock {
   static MiniDFSCluster hdfsCluster;
   static FileSystem fs;
   static String hdfsURI;
-  static Configuration conf = new  HdfsConfiguration();
+  static HdfsConfiguration conf = new  HdfsConfiguration();
 
 
   @Rule
@@ -54,6 +55,7 @@ public class TestDirLock {
 
   @BeforeClass
   public static void setupClass() throws IOException {
+    conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,"5000");
     builder = new MiniDFSCluster.Builder(new Configuration());
     hdfsCluster = builder.build();
     fs  = hdfsCluster.getFileSystem();
@@ -76,19 +78,36 @@ public class TestDirLock {
     fs.delete(lockDir, true);
   }
 
-//  @Test
+
+  @Test
+  public void testBasicLocking() throws Exception {
+    // 1 grab lock
+    DirLock lock = DirLock.tryLock(fs, lockDir);
+    Assert.assertTrue(fs.exists(lock.getLockFile()));
+
+    // 2 try to grab another lock while dir is locked
+    DirLock lock2 = DirLock.tryLock(fs, lockDir); // should fail
+    Assert.assertNull(lock2);
+
+    // 3 let go first lock
+    lock.release();
+    Assert.assertFalse(fs.exists(lock.getLockFile()));
+
+    // 4 try locking again
+    lock2  = DirLock.tryLock(fs, lockDir);
+    Assert.assertTrue(fs.exists(lock2.getLockFile()));
+    lock2.release();
+    Assert.assertFalse(fs.exists(lock.getLockFile()));
+    lock2.release();  // should be throw
+  }
+
+
+  @Test
   public void testConcurrentLocking() throws Exception {
-//    -Dlog4j.configuration=config
-    Logger.getRootLogger().setLevel(Level.ERROR);
-    DirLockingThread[] thds = startThreads(10, lockDir );
-    for (DirLockingThread thd : thds) {
-      thd.start();
-    }
-    System.err.println("Thread creation complete");
-    Thread.sleep(5000);
+    DirLockingThread[] thds = startThreads(100, lockDir );
     for (DirLockingThread thd : thds) {
-      thd.join(1000);
-      if(thd.isAlive() && thd.cleanExit)
+      thd.join();
+      if( !thd.cleanExit)
         System.err.println(thd.getName() + " did not exit cleanly");
       Assert.assertTrue(thd.cleanExit);
     }
@@ -97,14 +116,16 @@ public class TestDirLock {
     Assert.assertFalse(fs.exists(lockFile));
   }
 
-
-
   private DirLockingThread[] startThreads(int thdCount, Path dir)
           throws IOException {
     DirLockingThread[] result = new DirLockingThread[thdCount];
     for (int i = 0; i < thdCount; i++) {
       result[i] = new DirLockingThread(i, fs, dir);
     }
+
+    for (DirLockingThread thd : result) {
+      thd.start();
+    }
     return result;
   }
 
@@ -123,20 +144,31 @@ public class TestDirLock {
 
     @Override
     public void run() {
+      DirLock lock = null;
       try {
-        DirLock lock;
         do {
+          System.err.println("Trying lock " + getName());
           lock = DirLock.tryLock(fs, dir);
+          System.err.println("Acquired lock " + getName());
           if(lock==null) {
             System.out.println("Retrying lock - " + Thread.currentThread().getId());
           }
         } while (lock==null);
-        lock.release();
         cleanExit= true;
-      } catch (IOException e) {
+      } catch (Exception e) {
         e.printStackTrace();
       }
-
+      finally {
+          try {
+            if(lock!=null) {
+              lock.release();
+              System.err.println("Released lock " + getName());
+            }
+          } catch (IOException e) {
+            e.printStackTrace(System.err);
+          }
+      }
+      System.err.println("Thread exiting " + getName());
     }
 
   }

http://git-wip-us.apache.org/repos/asf/storm/blob/2fb0d7d9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 610f7e9..fed5d3b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -213,7 +213,7 @@
         <clojure.tools.cli.version>0.2.4</clojure.tools.cli.version>
         <disruptor.version>3.3.2</disruptor.version>
         <jgrapht.version>0.9.0</jgrapht.version>
-        <guava.version>16.0.1</guava.version>
+        <guava.version>15.0</guava.version>
         <netty.version>3.9.0.Final</netty.version>
         <log4j-over-slf4j.version>1.6.6</log4j-over-slf4j.version>
         <log4j.version>2.1</log4j.version>
@@ -227,7 +227,7 @@
         <clojure-data-codec.version>0.1.0</clojure-data-codec.version>
         <clojure-contrib.version>1.2.0</clojure-contrib.version>
         <hive.version>0.14.0</hive.version>
-        <hadoop.version>2.6.0</hadoop.version>
+        <hadoop.version>2.6.1</hadoop.version>
         <kryo.version>2.21</kryo.version>
         <servlet.version>2.5</servlet.version>
         <joda-time.version>2.3</joda-time.version>