You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2012/07/17 23:19:39 UTC
svn commit: r1362656 - in /zookeeper/trunk: ./
src/java/main/org/apache/zookeeper/common/
src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/config/
src/java/test/org/apache/zookeeper/test/
Author: phunt
Date: Tue Jul 17 21:19:38 2012
New Revision: 1362656
URL: http://svn.apache.org/viewvc?rev=1362656&view=rev
Log:
ZOOKEEPER-1427. Writing to local files is done non-atomically (phunt)
Added:
zookeeper/trunk/src/java/main/org/apache/zookeeper/common/AtomicFileOutputStream.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/common/IOUtils.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AtomicFileOutputStreamTest.java
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1362656&r1=1362655&r2=1362656&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Tue Jul 17 21:19:38 2012
@@ -207,6 +207,8 @@ BUGFIXES:
takes a long time with large datasets - is correlated to dataset size
(fpj and Thawan Kooburat via camille)
+ ZOOKEEPER-1427. Writing to local files is done non-atomically (phunt)
+
IMPROVEMENTS:
ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,
Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/common/AtomicFileOutputStream.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/common/AtomicFileOutputStream.java?rev=1362656&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/common/AtomicFileOutputStream.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/common/AtomicFileOutputStream.java Tue Jul 17 21:19:38 2012
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.common;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * This code is originally from HDFS, see the similarly named files there
+ * in case of bug fixing, history, etc...
+ */
+
+/**
+ * A FileOutputStream that has the property that it will only show up at its
+ * destination once it has been entirely written and flushed to disk. While
+ * being written, it will use a .tmp suffix.
+ *
+ * When the output stream is closed, it is flushed, fsynced, and will be moved
+ * into place, overwriting any file that already exists at that location.
+ *
+ * <b>NOTE</b>: on Windows platforms, it will not atomically replace the target
+ * file - instead the target file is deleted before this one is moved into
+ * place.
+ */
+public class AtomicFileOutputStream extends FilterOutputStream {
+ private static final String TMP_EXTENSION = ".tmp";
+
+ private final static Logger LOG = LoggerFactory
+ .getLogger(AtomicFileOutputStream.class);
+
+ private final File origFile;
+ private final File tmpFile;
+
+ public AtomicFileOutputStream(File f) throws FileNotFoundException {
+ // Code unfortunately must be duplicated below since we can't assign
+ // anything
+ // before calling super
+ super(new FileOutputStream(new File(f.getParentFile(), f.getName()
+ + TMP_EXTENSION)));
+ origFile = f.getAbsoluteFile();
+ tmpFile = new File(f.getParentFile(), f.getName() + TMP_EXTENSION)
+ .getAbsoluteFile();
+ }
+
+ @Override
+ public void close() throws IOException {
+ boolean triedToClose = false, success = false;
+ try {
+ flush();
+ ((FileOutputStream) out).getChannel().force(true);
+
+ triedToClose = true;
+ super.close();
+ success = true;
+ } finally {
+ if (success) {
+ boolean renamed = tmpFile.renameTo(origFile);
+ if (!renamed) {
+ // On windows, renameTo does not replace.
+ if (!origFile.delete() || !tmpFile.renameTo(origFile)) {
+ throw new IOException(
+ "Could not rename temporary file " + tmpFile
+ + " to " + origFile);
+ }
+ }
+ } else {
+ if (!triedToClose) {
+ // If we failed when flushing, try to close it to not leak
+ // an FD
+ IOUtils.closeStream(out);
+ }
+ // close wasn't successful, try to delete the tmp file
+ if (!tmpFile.delete()) {
+ LOG.warn("Unable to delete tmp file " + tmpFile);
+ }
+ }
+ }
+ }
+
+ /**
+ * Close the atomic file, but do not "commit" the temporary file on top of
+ * the destination. This should be used if there is a failure in writing.
+ */
+ public void abort() {
+ try {
+ super.close();
+ } catch (IOException ioe) {
+ LOG.warn("Unable to abort file " + tmpFile, ioe);
+ }
+ if (!tmpFile.delete()) {
+ LOG.warn("Unable to delete tmp file during abort " + tmpFile);
+ }
+ }
+}
Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/common/IOUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/common/IOUtils.java?rev=1362656&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/common/IOUtils.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/common/IOUtils.java Tue Jul 17 21:19:38 2012
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.common;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import org.slf4j.Logger;
+
+/*
+ * This code is originally from HDFS, see the similarly named files there
+ * in case of bug fixing, history, etc...
+ */
+
+public class IOUtils {
+ /**
+ * Closes the stream ignoring {@link IOException}. Must only be called in
+ * cleaning up from exception handlers.
+ *
+ * @param stream
+ * the Stream to close
+ */
+ public static void closeStream(Closeable stream) {
+ cleanup(null, stream);
+ }
+
+ /**
+ * Close the Closeable objects and <b>ignore</b> any {@link IOException} or
+ * null pointers. Must only be used for cleanup in exception handlers.
+ *
+ * @param log
+ * the log to record problems to at debug level. Can be null.
+ * @param closeables
+ * the objects to close
+ */
+ public static void cleanup(Logger log, Closeable... closeables) {
+ for (Closeable c : closeables) {
+ if (c != null) {
+ try {
+ c.close();
+ } catch (IOException e) {
+ if (log != null) {
+ log.warn("Exception in closing " + c, e);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Copies from one stream to another.
+ *
+ * @param in
+ * InputStrem to read from
+ * @param out
+ * OutputStream to write to
+ * @param buffSize
+ * the size of the buffer
+ * @param close
+ * whether or not close the InputStream and OutputStream at the
+ * end. The streams are closed in the finally clause.
+ */
+ public static void copyBytes(InputStream in, OutputStream out,
+ int buffSize, boolean close) throws IOException {
+ try {
+ copyBytes(in, out, buffSize);
+ if (close) {
+ out.close();
+ out = null;
+ in.close();
+ in = null;
+ }
+ } finally {
+ if (close) {
+ closeStream(out);
+ closeStream(in);
+ }
+ }
+ }
+
+ /**
+ * Copies from one stream to another.
+ *
+ * @param in
+ * InputStrem to read from
+ * @param out
+ * OutputStream to write to
+ * @param buffSize
+ * the size of the buffer
+ */
+ public static void copyBytes(InputStream in, OutputStream out, int buffSize)
+ throws IOException {
+ PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null;
+ byte buf[] = new byte[buffSize];
+ int bytesRead = in.read(buf);
+ while (bytesRead >= 0) {
+ out.write(buf, 0, bytesRead);
+ if ((ps != null) && ps.checkError()) {
+ throw new IOException("Unable to write to output stream.");
+ }
+ bytesRead = in.read(buf);
+ }
+ }
+
+}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1362656&r1=1362655&r2=1362656&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Tue Jul 17 21:19:38 2012
@@ -21,7 +21,6 @@ import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStreamWriter;
@@ -39,6 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.apache.zookeeper.common.AtomicFileOutputStream;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.jmx.ZKMBeanInfo;
import org.apache.zookeeper.server.ServerCnxnFactory;
@@ -1283,16 +1283,36 @@ public class QuorumPeer extends Thread i
public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch";
+ /**
+ * Write a long value to disk atomically. Either succeeds or an exception
+ * is thrown.
+ * @param name file name to write the long to
+ * @param value the long value to write to the named file
+ * @throws IOException if the file cannot be written atomically
+ */
private void writeLongToFile(String name, long value) throws IOException {
File file = new File(logFactory.getSnapDir(), name);
- FileOutputStream out = new FileOutputStream(file);
+ AtomicFileOutputStream out = new AtomicFileOutputStream(file);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out));
+ boolean aborted = false;
try {
bw.write(Long.toString(value));
bw.flush();
- out.getFD().sync();
+
+ out.flush();
+ } catch (IOException e) {
+ LOG.error("Failed to write new file " + file, e);
+ // worst case here the tmp file/resources(fd) are not cleaned up
+ // and the caller will be notified (IOException)
+ aborted = true;
+ out.abort();
+ throw e;
} finally {
- bw.close();
+ if (!aborted) {
+ // if the close operation (rename) fails we'll get notified.
+ // worst case the tmp file may still exist
+ out.close();
+ }
}
}
Modified: zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml?rev=1362656&r1=1362655&r2=1362656&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml (original)
+++ zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml Tue Jul 17 21:19:38 2012
@@ -125,4 +125,10 @@
</Or>
</Match>
+ <Match>
+ <Class name="org.apache.zookeeper.server.quorum.QuorumPeer"/>
+ <Bug pattern="OS_OPEN_STREAM" />
+ <Method name="writeLongToFile" />
+ </Match>
+
</FindBugsFilter>
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AtomicFileOutputStreamTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AtomicFileOutputStreamTest.java?rev=1362656&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AtomicFileOutputStreamTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AtomicFileOutputStreamTest.java Tue Jul 17 21:19:38 2012
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.common.AtomicFileOutputStream;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AtomicFileOutputStreamTest extends ZKTestCase {
+ private static final String TEST_STRING = "hello world";
+ private static final String TEST_STRING_2 = "goodbye world";
+
+ private File testDir;
+ private File dstFile;
+
+ @Before
+ public void setupTestDir() throws IOException {
+ testDir = ClientBase.createTmpDir();
+ dstFile = new File(testDir, "test.txt");
+ }
+ @After
+ public void cleanupTestDir() throws IOException {
+ ClientBase.recursiveDelete(testDir);
+ }
+
+ /**
+ * Test case where there is no existing file
+ */
+ @Test
+ public void testWriteNewFile() throws IOException {
+ OutputStream fos = new AtomicFileOutputStream(dstFile);
+ assertFalse(dstFile.exists());
+ fos.write(TEST_STRING.getBytes());
+ fos.flush();
+ assertFalse(dstFile.exists());
+ fos.close();
+ assertTrue(dstFile.exists());
+
+ String readBackData = ClientBase.readFile(dstFile);
+ assertEquals(TEST_STRING, readBackData);
+ }
+
+ /**
+ * Test case where there is no existing file
+ */
+ @Test
+ public void testOverwriteFile() throws IOException {
+ assertTrue("Creating empty dst file", dstFile.createNewFile());
+
+ OutputStream fos = new AtomicFileOutputStream(dstFile);
+
+ assertTrue("Empty file still exists", dstFile.exists());
+ fos.write(TEST_STRING.getBytes());
+ fos.flush();
+
+ // Original contents still in place
+ assertEquals("", ClientBase.readFile(dstFile));
+
+ fos.close();
+
+ // New contents replace original file
+ String readBackData = ClientBase.readFile(dstFile);
+ assertEquals(TEST_STRING, readBackData);
+ }
+
+ /**
+ * Test case where the flush() fails at close time - make sure that we clean
+ * up after ourselves and don't touch any existing file at the destination
+ */
+ @Test
+ public void testFailToFlush() throws IOException {
+ // Create a file at destination
+ FileOutputStream fos = new FileOutputStream(dstFile);
+ fos.write(TEST_STRING_2.getBytes());
+ fos.close();
+
+ OutputStream failingStream = createFailingStream();
+ failingStream.write(TEST_STRING.getBytes());
+ try {
+ failingStream.close();
+ fail("Close didn't throw exception");
+ } catch (IOException ioe) {
+ // expected
+ }
+
+ // Should not have touched original file
+ assertEquals(TEST_STRING_2, ClientBase.readFile(dstFile));
+
+ assertEquals("Temporary file should have been cleaned up",
+ dstFile.getName(), ClientBase.join(",", testDir.list()));
+ }
+
+ /**
+ * Create a stream that fails to flush at close time
+ */
+ private OutputStream createFailingStream() throws FileNotFoundException {
+ return new AtomicFileOutputStream(dstFile) {
+ @Override
+ public void flush() throws IOException {
+ throw new IOException("injected failure");
+ }
+ };
+ }
+
+ /**
+ * Ensure the tmp file is cleaned up and dstFile is not created when
+ * aborting a new file.
+ */
+ @Test
+ public void testAbortNewFile() throws IOException {
+ AtomicFileOutputStream fos = new AtomicFileOutputStream(dstFile);
+
+ fos.abort();
+
+ assertEquals(0, testDir.list().length);
+ }
+
+ /**
+ * Ensure the tmp file is cleaned up and dstFile is not created when
+ * aborting a new file.
+ */
+ @Test
+ public void testAbortNewFileAfterFlush() throws IOException {
+ AtomicFileOutputStream fos = new AtomicFileOutputStream(dstFile);
+ fos.write(TEST_STRING.getBytes());
+ fos.flush();
+
+ fos.abort();
+
+ assertEquals(0, testDir.list().length);
+ }
+
+ /**
+ * Ensure the tmp file is cleaned up and dstFile is untouched when
+ * aborting an existing file overwrite.
+ */
+ @Test
+ public void testAbortExistingFile() throws IOException {
+ FileOutputStream fos1 = new FileOutputStream(dstFile);
+ fos1.write(TEST_STRING.getBytes());
+ fos1.close();
+
+ AtomicFileOutputStream fos2 = new AtomicFileOutputStream(dstFile);
+
+ fos2.abort();
+
+ // Should not have touched original file
+ assertEquals(TEST_STRING, ClientBase.readFile(dstFile));
+ assertEquals(1, testDir.list().length);
+ }
+
+ /**
+ * Ensure the tmp file is cleaned up and dstFile is untouched when
+ * aborting an existing file overwrite.
+ */
+ @Test
+ public void testAbortExistingFileAfterFlush() throws IOException {
+ FileOutputStream fos1 = new FileOutputStream(dstFile);
+ fos1.write(TEST_STRING.getBytes());
+ fos1.close();
+
+ AtomicFileOutputStream fos2 = new AtomicFileOutputStream(dstFile);
+ fos2.write(TEST_STRING_2.getBytes());
+ fos2.flush();
+
+ fos2.abort();
+
+ // Should not have touched original file
+ assertEquals(TEST_STRING, ClientBase.readFile(dstFile));
+ assertEquals(1, testDir.list().length);
+ }
+}
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=1362656&r1=1362655&r2=1362656&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Tue Jul 17 21:19:38 2012
@@ -20,7 +20,10 @@ package org.apache.zookeeper.test;
import static org.apache.zookeeper.client.FourLetterWordMain.send4LetterWord;
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
@@ -44,19 +47,19 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.IOUtils;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactoryAccessor;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnLog;
import org.apache.zookeeper.server.quorum.QuorumPeer;
-
-import static org.apache.zookeeper.client.FourLetterWordMain.send4LetterWord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import com.sun.management.UnixOperatingSystemMXBean;
public abstract class ClientBase extends ZKTestCase {
@@ -569,4 +572,28 @@ public abstract class ClientBase extends
}
}
}
+
+ public static String readFile(File file) throws IOException {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ BufferedInputStream is = new BufferedInputStream(new FileInputStream(file));
+ try {
+ IOUtils.copyBytes(is, os, 1024, true);
+ } finally {
+ is.close();
+ }
+ return os.toString();
+ }
+
+ public static String join(String separator, Object[] parts) {
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for (Object part : parts) {
+ if (!first) {
+ sb.append(separator);
+ first = false;
+ }
+ sb.append(part);
+ }
+ return sb.toString();
+ }
}