You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/01/15 17:46:07 UTC
[06/24] storm git commit: fixing FileLock and sharing code with
DirLock for file creation logic
fixing FileLock and sharing code with DirLock for file creation logic
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dcc930b9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dcc930b9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dcc930b9
Branch: refs/heads/1.x-branch
Commit: dcc930b9ff663f7539dd00e49a68e1bcdcf308d4
Parents: 2fb0d7d
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Thu Dec 10 19:23:59 2015 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Thu Jan 14 11:34:55 2016 -0800
----------------------------------------------------------------------
.../org/apache/storm/hdfs/common/HdfsUtils.java | 24 ++++
.../org/apache/storm/hdfs/spout/DirLock.java | 33 +++---
.../org/apache/storm/hdfs/spout/FileLock.java | 50 +++++---
.../apache/storm/hdfs/spout/TestDirLock.java | 5 -
.../apache/storm/hdfs/spout/TestFileLock.java | 117 +++++++++++++++++++
5 files changed, 192 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/dcc930b9/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
index 8fc8b0d..e8c32aa 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
@@ -18,10 +18,15 @@
package org.apache.storm.hdfs.common;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.storm.hdfs.spout.DirLock;
import java.io.IOException;
import java.util.ArrayList;
@@ -55,6 +60,25 @@ public class HdfsUtils {
return result;
}
+ /**
+ * Returns true if succeeded. False if file already exists. throws if there was unexpected problem
+ */
+ public static FSDataOutputStream tryCreateFile(FileSystem fs, Path file) throws IOException {
+ try {
+ FSDataOutputStream os = fs.create(file, false);
+ return os;
+ } catch (FileAlreadyExistsException e) {
+ return null;
+ } catch (RemoteException e) {
+ if( e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName()) ) {
+ return null;
+ } else { // unexpected error
+ throw e;
+ }
+ }
+ }
+
+
public static class Pair<K,V> {
private K key;
private V value;
http://git-wip-us.apache.org/repos/asf/storm/blob/dcc930b9/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
index 304f26d..0ff2f37 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
@@ -21,14 +21,16 @@ package org.apache.storm.hdfs.spout;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
-import org.apache.hadoop.ipc.RemoteException;
+import org.apache.storm.hdfs.common.HdfsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
+/**
+ * Facility to sychronize access to HDFS directory. The lock itself is represented
+ * as a file in the same directory. Relies on atomic file creation.
+ */
public class DirLock {
private FileSystem fs;
private final Path lockFile;
@@ -41,7 +43,7 @@ public class DirLock {
this.lockFile = lockFile;
}
- /** Returns null if somebody else has a lock
+ /** Get a lock on file if not already locked
*
* @param fs
* @param dir the dir on which to get a lock
@@ -50,29 +52,26 @@ public class DirLock {
*/
public static DirLock tryLock(FileSystem fs, Path dir) throws IOException {
Path lockFile = new Path(dir.toString() + Path.SEPARATOR_CHAR + DIR_LOCK_FILE );
+
try {
- FSDataOutputStream os = fs.create(lockFile, false);
- if (log.isInfoEnabled()) {
+ FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile);
+ if (ostream!=null) {
log.info("Thread ({}) acquired lock on dir {}", threadInfo(), dir);
- }
- os.close();
- return new DirLock(fs, lockFile);
- } catch (FileAlreadyExistsException e) {
- log.info("Thread ({}) cannot lock dir {} as its already locked.", threadInfo(), dir);
- return null;
- } catch (RemoteException e) {
- if( e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName()) ) {
+ ostream.close();
+ return new DirLock(fs, lockFile);
+ } else {
log.info("Thread ({}) cannot lock dir {} as its already locked.", threadInfo(), dir);
return null;
- } else { // unexpected error
+ }
+ } catch (IOException e) {
log.error("Error when acquiring lock on dir " + dir, e);
throw e;
- }
}
}
private static String threadInfo () {
- return "ThdId=" + Thread.currentThread().getId() + ", ThdName=" + Thread.currentThread().getName();
+ return "ThdId=" + Thread.currentThread().getId() + ", ThdName="
+ + Thread.currentThread().getName();
}
/** Release lock on dir by deleting the lock file */
http://git-wip-us.apache.org/repos/asf/storm/blob/dcc930b9/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
index f4a6813..1974e44 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
@@ -28,26 +28,32 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Collection;
+/**
+ * Facility to synchronize access to HDFS files. Thread gains exclusive access to a file by acquiring
+ * a FileLock object. The lock itself is represented as file on HDFS. Relies on atomic file creation.
+ * Owning thread must heartbeat periodically on the lock to prevent the lock from being deemed as
+ * stale (i.e. lock whose owning thread have died).
+ */
public class FileLock {
private final FileSystem fs;
private final String componentID;
private final Path lockFile;
- private final FSDataOutputStream stream;
+ private final DataOutputStream lockFileStream;
private LogEntry lastEntry;
private static final Logger log = LoggerFactory.getLogger(DirLock.class);
- private FileLock(FileSystem fs, Path fileToLock, Path lockDirPath, String spoutId)
+ private FileLock(FileSystem fs, Path lockFile, DataOutputStream lockFileStream, String spoutId)
throws IOException {
this.fs = fs;
- String lockFileName = lockDirPath.toString() + Path.SEPARATOR_CHAR + fileToLock.getName();
- this.lockFile = new Path(lockFileName);
- this.stream = fs.create(lockFile);
+ this.lockFile = lockFile;
+ this.lockFileStream = lockFileStream;
this.componentID = spoutId;
logProgress("0", false);
}
@@ -56,7 +62,7 @@ public class FileLock {
throws IOException {
this.fs = fs;
this.lockFile = lockFile;
- this.stream = fs.append(lockFile);
+ this.lockFileStream = fs.append(lockFile);
this.componentID = spoutId;
log.debug("Acquired abandoned lockFile {}", lockFile);
logProgress(entry.fileOffset, true);
@@ -74,22 +80,37 @@ public class FileLock {
LogEntry entry = new LogEntry(now, componentID, fileOffset);
String line = entry.toString();
if(prefixNewLine)
- stream.writeBytes(System.lineSeparator() + line);
+ lockFileStream.writeBytes(System.lineSeparator() + line);
else
- stream.writeBytes(line);
- stream.flush();
+ lockFileStream.writeBytes(line);
+ lockFileStream.flush();
lastEntry = entry; // update this only after writing to hdfs
}
public void release() throws IOException {
- stream.close();
+ lockFileStream.close();
fs.delete(lockFile, false);
}
- // throws exception immediately if not able to acquire lock
- public static FileLock tryLock(FileSystem hdfs, Path fileToLock, Path lockDirPath, String spoutId)
+ /** returns lock on file or null if file is already locked. throws if unexpected problem */
+ public static FileLock tryLock(FileSystem fs, Path fileToLock, Path lockDirPath, String spoutId)
throws IOException {
- return new FileLock(hdfs, fileToLock, lockDirPath, spoutId);
+ String lockFileName = lockDirPath.toString() + Path.SEPARATOR_CHAR + fileToLock.getName();
+ Path lockFile = new Path(lockFileName);
+
+ try {
+ FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile);
+ if (ostream != null) {
+ log.info("Acquired lock on file {}. LockFile=", fileToLock, lockFile);
+ return new FileLock(fs, lockFile, ostream, spoutId);
+ } else {
+ log.info("Cannot lock file {} as its already locked.", fileToLock);
+ return null;
+ }
+ } catch (IOException e) {
+ log.error("Error when acquiring lock on file " + fileToLock, e);
+ throw e;
+ }
}
/**
@@ -105,7 +126,7 @@ public class FileLock {
public static LogEntry getLastEntryIfStale(FileSystem fs, Path lockFile, long olderThan)
throws IOException {
if( fs.getFileStatus(lockFile).getModificationTime() >= olderThan ) {
- // HDFS timestamp may not reflect recent updates, so we double check the
+ //Impt: HDFS timestamp may not reflect recent appends, so we double check the
// timestamp in last line of file to see when the last update was made
LogEntry lastEntry = getLastEntry(fs, lockFile);
if(lastEntry==null) {
@@ -136,7 +157,6 @@ public class FileLock {
}
// takes ownership of the lock file
-
/**
* Takes ownership of the lock file.
* @param lockFile
http://git-wip-us.apache.org/repos/asf/storm/blob/dcc930b9/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
index fcfe704..bdb0cdf 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
@@ -40,19 +40,14 @@ import java.io.IOException;
public class TestDirLock {
-
static MiniDFSCluster.Builder builder;
static MiniDFSCluster hdfsCluster;
static FileSystem fs;
static String hdfsURI;
static HdfsConfiguration conf = new HdfsConfiguration();
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
private Path lockDir = new Path("/tmp/lockdir");
-
@BeforeClass
public static void setupClass() throws IOException {
conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,"5000");
http://git-wip-us.apache.org/repos/asf/storm/blob/dcc930b9/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
new file mode 100644
index 0000000..8031041
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.spout;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestFileLock {
+
+ static MiniDFSCluster.Builder builder;
+ static MiniDFSCluster hdfsCluster;
+ static FileSystem fs;
+ static String hdfsURI;
+ static HdfsConfiguration conf = new HdfsConfiguration();
+
+ private Path filesDir = new Path("/tmp/lockdir");
+ private Path locksDir = new Path("/tmp/lockdir");
+
+ @BeforeClass
+ public static void setupClass() throws IOException {
+ conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,"5000");
+ builder = new MiniDFSCluster.Builder(new Configuration());
+ hdfsCluster = builder.build();
+ fs = hdfsCluster.getFileSystem();
+ hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
+ }
+
+ @AfterClass
+ public static void teardownClass() throws IOException {
+ fs.close();
+ hdfsCluster.shutdown();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ assert fs.mkdirs(filesDir) ;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ fs.delete(filesDir, true);
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+ // create empty files in filesDir
+ Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
+ Path file2 = new Path(filesDir + Path.SEPARATOR + "file2");
+ fs.create(file1).close();
+ fs.create(file2).close(); // create empty file
+
+ // acquire lock on file1 and verify if worked
+ FileLock lock1a = FileLock.tryLock(fs, file1, locksDir, "spout1");
+ Assert.assertNotNull(lock1a);
+ Assert.assertTrue(fs.exists(lock1a.getLockFile()));
+ Assert.assertEquals(lock1a.getLockFile().getParent(), locksDir); // verify lock file location
+ Assert.assertEquals(lock1a.getLockFile().getName(), file1.getName()); // very lock filename
+
+ // acquire another lock on file1 and verify it failed
+ FileLock lock1b = FileLock.tryLock(fs, file1, locksDir, "spout1");
+ Assert.assertNull(lock1b);
+
+ // release lock on file1 and check
+ lock1a.release();
+ Assert.assertFalse(fs.exists(lock1a.getLockFile()));
+
+ // Retry locking and verify
+ FileLock lock1c = FileLock.tryLock(fs, file1, locksDir, "spout1");
+ Assert.assertNotNull(lock1c);
+ Assert.assertTrue(fs.exists(lock1c.getLockFile()));
+ Assert.assertEquals(lock1c.getLockFile().getParent(), locksDir); // verify lock file location
+ Assert.assertEquals(lock1c.getLockFile().getName(), file1.getName()); // very lock filename
+
+ // try locking another file2 at the same time
+ FileLock lock2a = FileLock.tryLock(fs, file2, locksDir, "spout1");
+ Assert.assertNotNull(lock2a);
+ Assert.assertTrue(fs.exists(lock2a.getLockFile()));
+ Assert.assertEquals(lock2a.getLockFile().getParent(), locksDir); // verify lock file location
+ Assert.assertEquals(lock2a.getLockFile().getName(), file1.getName()); // very lock filename
+
+ // release both locks
+ lock2a.release();
+ Assert.assertFalse(fs.exists(lock2a.getLockFile()));
+ lock1c.release();
+ Assert.assertFalse(fs.exists(lock1c.getLockFile()));
+ }
+
+
+}