You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/01/15 17:46:07 UTC

[06/24] storm git commit: fixing FileLock and sharing code with DirLock for file creation logic

fixing FileLock and sharing code with DirLock for file creation logic


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dcc930b9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dcc930b9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dcc930b9

Branch: refs/heads/1.x-branch
Commit: dcc930b9ff663f7539dd00e49a68e1bcdcf308d4
Parents: 2fb0d7d
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Thu Dec 10 19:23:59 2015 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Thu Jan 14 11:34:55 2016 -0800

----------------------------------------------------------------------
 .../org/apache/storm/hdfs/common/HdfsUtils.java |  24 ++++
 .../org/apache/storm/hdfs/spout/DirLock.java    |  33 +++---
 .../org/apache/storm/hdfs/spout/FileLock.java   |  50 +++++---
 .../apache/storm/hdfs/spout/TestDirLock.java    |   5 -
 .../apache/storm/hdfs/spout/TestFileLock.java   | 117 +++++++++++++++++++
 5 files changed, 192 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dcc930b9/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
index 8fc8b0d..e8c32aa 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
@@ -18,10 +18,15 @@
 
 package org.apache.storm.hdfs.common;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.storm.hdfs.spout.DirLock;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -55,6 +60,25 @@ public class HdfsUtils {
     return result;
   }
 
+  /**
+   * Returns true if succeeded. False if file already exists. throws if there was unexpected problem
+   */
+  public static FSDataOutputStream tryCreateFile(FileSystem fs, Path file) throws IOException {
+    try {
+      FSDataOutputStream os = fs.create(file, false);
+      return os;
+    } catch (FileAlreadyExistsException e) {
+      return null;
+    } catch (RemoteException e) {
+      if( e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName()) ) {
+        return null;
+      } else { // unexpected error
+        throw e;
+      }
+    }
+  }
+
+
   public static class Pair<K,V> {
     private K key;
     private V value;

http://git-wip-us.apache.org/repos/asf/storm/blob/dcc930b9/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
index 304f26d..0ff2f37 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
@@ -21,14 +21,16 @@ package org.apache.storm.hdfs.spout;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
-import org.apache.hadoop.ipc.RemoteException;
+import org.apache.storm.hdfs.common.HdfsUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
 
+/**
+ * Facility to sychronize access to HDFS directory. The lock itself is represented
+ * as a file in the same directory. Relies on atomic file creation.
+ */
 public class DirLock {
   private FileSystem fs;
   private final Path lockFile;
@@ -41,7 +43,7 @@ public class DirLock {
     this.lockFile = lockFile;
   }
 
-  /** Returns null if somebody else has a lock
+  /** Get a lock on file if not already locked
    *
    * @param fs
    * @param dir  the dir on which to get a lock
@@ -50,29 +52,26 @@ public class DirLock {
    */
   public static DirLock tryLock(FileSystem fs, Path dir) throws IOException {
     Path lockFile = new Path(dir.toString() + Path.SEPARATOR_CHAR + DIR_LOCK_FILE );
+
     try {
-      FSDataOutputStream os = fs.create(lockFile, false);
-      if (log.isInfoEnabled()) {
+      FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile);
+      if (ostream!=null) {
         log.info("Thread ({}) acquired lock on dir {}", threadInfo(), dir);
-      }
-      os.close();
-      return new DirLock(fs, lockFile);
-    } catch (FileAlreadyExistsException e) {
-      log.info("Thread ({}) cannot lock dir {} as its already locked.", threadInfo(), dir);
-      return null;
-    } catch (RemoteException e) {
-      if( e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName()) ) {
+        ostream.close();
+        return new DirLock(fs, lockFile);
+      } else {
         log.info("Thread ({}) cannot lock dir {} as its already locked.", threadInfo(), dir);
         return null;
-      } else { // unexpected error
+      }
+    } catch (IOException e) {
         log.error("Error when acquiring lock on dir " + dir, e);
         throw e;
-      }
     }
   }
 
   private static String threadInfo () {
-    return "ThdId=" + Thread.currentThread().getId() + ", ThdName=" + Thread.currentThread().getName();
+    return "ThdId=" + Thread.currentThread().getId() + ", ThdName="
+            + Thread.currentThread().getName();
   }
 
   /** Release lock on dir by deleting the lock file */

http://git-wip-us.apache.org/repos/asf/storm/blob/dcc930b9/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
index f4a6813..1974e44 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
@@ -28,26 +28,32 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.Collection;
 
+/**
+ * Facility to synchronize access to HDFS files. Thread gains exclusive access to a file by acquiring
+ * a FileLock object. The lock itself is represented as file on HDFS. Relies on atomic file creation.
+ * Owning thread must heartbeat periodically on the lock to prevent the lock from being deemed as
+ * stale (i.e. lock whose owning thread have died).
+ */
 public class FileLock {
 
   private final FileSystem fs;
   private final String componentID;
   private final Path lockFile;
-  private final FSDataOutputStream stream;
+  private final DataOutputStream lockFileStream;
   private LogEntry lastEntry;
 
   private static final Logger log = LoggerFactory.getLogger(DirLock.class);
 
-  private FileLock(FileSystem fs, Path fileToLock, Path lockDirPath, String spoutId)
+  private FileLock(FileSystem fs, Path lockFile, DataOutputStream lockFileStream, String spoutId)
           throws IOException {
     this.fs = fs;
-    String lockFileName = lockDirPath.toString() + Path.SEPARATOR_CHAR + fileToLock.getName();
-    this.lockFile = new Path(lockFileName);
-    this.stream =  fs.create(lockFile);
+    this.lockFile = lockFile;
+    this.lockFileStream = lockFileStream;
     this.componentID = spoutId;
     logProgress("0", false);
   }
@@ -56,7 +62,7 @@ public class FileLock {
           throws IOException {
     this.fs = fs;
     this.lockFile = lockFile;
-    this.stream =  fs.append(lockFile);
+    this.lockFileStream =  fs.append(lockFile);
     this.componentID = spoutId;
     log.debug("Acquired abandoned lockFile {}", lockFile);
     logProgress(entry.fileOffset, true);
@@ -74,22 +80,37 @@ public class FileLock {
     LogEntry entry = new LogEntry(now, componentID, fileOffset);
     String line = entry.toString();
     if(prefixNewLine)
-      stream.writeBytes(System.lineSeparator() + line);
+      lockFileStream.writeBytes(System.lineSeparator() + line);
     else
-      stream.writeBytes(line);
-    stream.flush();
+      lockFileStream.writeBytes(line);
+    lockFileStream.flush();
     lastEntry = entry; // update this only after writing to hdfs
   }
 
   public void release() throws IOException {
-    stream.close();
+    lockFileStream.close();
     fs.delete(lockFile, false);
   }
 
-  // throws exception immediately if not able to acquire lock
-  public static FileLock tryLock(FileSystem hdfs, Path fileToLock, Path lockDirPath, String spoutId)
+  /** returns lock on file or null if file is already locked. throws if unexpected problem */
+  public static FileLock tryLock(FileSystem fs, Path fileToLock, Path lockDirPath, String spoutId)
           throws IOException {
-    return new FileLock(hdfs, fileToLock, lockDirPath, spoutId);
+    String lockFileName = lockDirPath.toString() + Path.SEPARATOR_CHAR + fileToLock.getName();
+    Path lockFile = new Path(lockFileName);
+
+    try {
+      FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile);
+      if (ostream != null) {
+        log.info("Acquired lock on file {}. LockFile=", fileToLock, lockFile);
+        return new FileLock(fs, lockFile, ostream, spoutId);
+      } else {
+        log.info("Cannot lock file {} as its already locked.", fileToLock);
+        return null;
+      }
+    } catch (IOException e) {
+      log.error("Error when acquiring lock on file " + fileToLock, e);
+      throw e;
+    }
   }
 
   /**
@@ -105,7 +126,7 @@ public class FileLock {
   public static LogEntry getLastEntryIfStale(FileSystem fs, Path lockFile, long olderThan)
           throws IOException {
     if( fs.getFileStatus(lockFile).getModificationTime() >= olderThan ) {
-      // HDFS timestamp may not reflect recent updates, so we double check the
+      //Impt: HDFS timestamp may not reflect recent appends, so we double check the
       // timestamp in last line of file to see when the last update was made
       LogEntry lastEntry =  getLastEntry(fs, lockFile);
       if(lastEntry==null) {
@@ -136,7 +157,6 @@ public class FileLock {
   }
 
   // takes ownership of the lock file
-
   /**
    * Takes ownership of the lock file.
    * @param lockFile

http://git-wip-us.apache.org/repos/asf/storm/blob/dcc930b9/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
index fcfe704..bdb0cdf 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
@@ -40,19 +40,14 @@ import java.io.IOException;
 
 public class TestDirLock {
 
-
   static MiniDFSCluster.Builder builder;
   static MiniDFSCluster hdfsCluster;
   static FileSystem fs;
   static String hdfsURI;
   static HdfsConfiguration conf = new  HdfsConfiguration();
 
-
-  @Rule
-  public TemporaryFolder tempFolder = new TemporaryFolder();
   private Path lockDir = new Path("/tmp/lockdir");
 
-
   @BeforeClass
   public static void setupClass() throws IOException {
     conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,"5000");

http://git-wip-us.apache.org/repos/asf/storm/blob/dcc930b9/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
new file mode 100644
index 0000000..8031041
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.spout;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestFileLock {
+
+  static MiniDFSCluster.Builder builder;
+  static MiniDFSCluster hdfsCluster;
+  static FileSystem fs;
+  static String hdfsURI;
+  static HdfsConfiguration conf = new  HdfsConfiguration();
+
+  private Path filesDir = new Path("/tmp/lockdir");
+  private Path locksDir = new Path("/tmp/lockdir");
+
+  @BeforeClass
+  public static void setupClass() throws IOException {
+    conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,"5000");
+    builder = new MiniDFSCluster.Builder(new Configuration());
+    hdfsCluster = builder.build();
+    fs  = hdfsCluster.getFileSystem();
+    hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
+  }
+
+  @AfterClass
+  public static void teardownClass() throws IOException {
+    fs.close();
+    hdfsCluster.shutdown();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    assert fs.mkdirs(filesDir) ;
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    fs.delete(filesDir, true);
+  }
+
+  @Test
+  public void testBasic() throws Exception {
+  // create empty files in filesDir
+    Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
+    Path file2 = new Path(filesDir + Path.SEPARATOR + "file2");
+    fs.create(file1).close();
+    fs.create(file2).close(); // create empty file
+
+    // acquire lock on file1 and verify if worked
+    FileLock lock1a = FileLock.tryLock(fs, file1, locksDir, "spout1");
+    Assert.assertNotNull(lock1a);
+    Assert.assertTrue(fs.exists(lock1a.getLockFile()));
+    Assert.assertEquals(lock1a.getLockFile().getParent(), locksDir); // verify lock file location
+    Assert.assertEquals(lock1a.getLockFile().getName(), file1.getName()); // very lock filename
+
+    // acquire another lock on file1 and verify it failed
+    FileLock lock1b = FileLock.tryLock(fs, file1, locksDir, "spout1");
+    Assert.assertNull(lock1b);
+
+    // release lock on file1 and check
+    lock1a.release();
+    Assert.assertFalse(fs.exists(lock1a.getLockFile()));
+
+    // Retry locking and verify
+    FileLock lock1c = FileLock.tryLock(fs, file1, locksDir, "spout1");
+    Assert.assertNotNull(lock1c);
+    Assert.assertTrue(fs.exists(lock1c.getLockFile()));
+    Assert.assertEquals(lock1c.getLockFile().getParent(), locksDir); // verify lock file location
+    Assert.assertEquals(lock1c.getLockFile().getName(), file1.getName()); // very lock filename
+
+    // try locking another file2 at the same time
+    FileLock lock2a = FileLock.tryLock(fs, file2, locksDir, "spout1");
+    Assert.assertNotNull(lock2a);
+    Assert.assertTrue(fs.exists(lock2a.getLockFile()));
+    Assert.assertEquals(lock2a.getLockFile().getParent(), locksDir); // verify lock file location
+    Assert.assertEquals(lock2a.getLockFile().getName(), file1.getName()); // very lock filename
+
+    // release both locks
+    lock2a.release();
+    Assert.assertFalse(fs.exists(lock2a.getLockFile()));
+    lock1c.release();
+    Assert.assertFalse(fs.exists(lock1c.getLockFile()));
+  }
+
+
+}