You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2012/07/17 23:19:39 UTC

svn commit: r1362656 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/common/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/config/ src/java/test/org/apache/zookeeper/test/

Author: phunt
Date: Tue Jul 17 21:19:38 2012
New Revision: 1362656

URL: http://svn.apache.org/viewvc?rev=1362656&view=rev
Log:
ZOOKEEPER-1427. Writing to local files is done non-atomically (phunt)

Added:
    zookeeper/trunk/src/java/main/org/apache/zookeeper/common/AtomicFileOutputStream.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/common/IOUtils.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AtomicFileOutputStreamTest.java
Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1362656&r1=1362655&r2=1362656&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Tue Jul 17 21:19:38 2012
@@ -207,6 +207,8 @@ BUGFIXES:
     takes a long time with large datasets - is correlated to dataset size
     (fpj and Thawan Kooburat via camille)
 
+  ZOOKEEPER-1427. Writing to local files is done non-atomically (phunt)
+
 IMPROVEMENTS:
 
   ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/common/AtomicFileOutputStream.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/common/AtomicFileOutputStream.java?rev=1362656&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/common/AtomicFileOutputStream.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/common/AtomicFileOutputStream.java Tue Jul 17 21:19:38 2012
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.common;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * This code is originally from HDFS, see the similarly named files there
+ * in case of bug fixing, history, etc...
+ */
+
+/**
+ * A FileOutputStream that has the property that it will only show up at its
+ * destination once it has been entirely written and flushed to disk. While
+ * being written, it will use a .tmp suffix.
+ * 
+ * When the output stream is closed, it is flushed, fsynced, and will be moved
+ * into place, overwriting any file that already exists at that location.
+ * 
+ * <b>NOTE</b>: on Windows platforms, it will not atomically replace the target
+ * file - instead the target file is deleted before this one is moved into
+ * place.
+ */
+public class AtomicFileOutputStream extends FilterOutputStream {
+    private static final String TMP_EXTENSION = ".tmp";
+
+    private final static Logger LOG = LoggerFactory
+            .getLogger(AtomicFileOutputStream.class);
+
+    private final File origFile;
+    private final File tmpFile;
+
+    public AtomicFileOutputStream(File f) throws FileNotFoundException {
+        // Code unfortunately must be duplicated below since we can't assign
+        // anything
+        // before calling super
+        super(new FileOutputStream(new File(f.getParentFile(), f.getName()
+                + TMP_EXTENSION)));
+        origFile = f.getAbsoluteFile();
+        tmpFile = new File(f.getParentFile(), f.getName() + TMP_EXTENSION)
+                .getAbsoluteFile();
+    }
+
+    @Override
+    public void close() throws IOException {
+        boolean triedToClose = false, success = false;
+        try {
+            flush();
+            ((FileOutputStream) out).getChannel().force(true);
+
+            triedToClose = true;
+            super.close();
+            success = true;
+        } finally {
+            if (success) {
+                boolean renamed = tmpFile.renameTo(origFile);
+                if (!renamed) {
+                    // On windows, renameTo does not replace.
+                    if (!origFile.delete() || !tmpFile.renameTo(origFile)) {
+                        throw new IOException(
+                                "Could not rename temporary file " + tmpFile
+                                        + " to " + origFile);
+                    }
+                }
+            } else {
+                if (!triedToClose) {
+                    // If we failed when flushing, try to close it to not leak
+                    // an FD
+                    IOUtils.closeStream(out);
+                }
+                // close wasn't successful, try to delete the tmp file
+                if (!tmpFile.delete()) {
+                    LOG.warn("Unable to delete tmp file " + tmpFile);
+                }
+            }
+        }
+    }
+
+    /**
+     * Close the atomic file, but do not "commit" the temporary file on top of
+     * the destination. This should be used if there is a failure in writing.
+     */
+    public void abort() {
+        try {
+            super.close();
+        } catch (IOException ioe) {
+            LOG.warn("Unable to abort file " + tmpFile, ioe);
+        }
+        if (!tmpFile.delete()) {
+            LOG.warn("Unable to delete tmp file during abort " + tmpFile);
+        }
+    }
+}

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/common/IOUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/common/IOUtils.java?rev=1362656&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/common/IOUtils.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/common/IOUtils.java Tue Jul 17 21:19:38 2012
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.common;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import org.slf4j.Logger;
+
+/*
+ * This code is originally from HDFS, see the similarly named files there
+ * in case of bug fixing, history, etc...
+ */
+
+public class IOUtils {
+    /**
+     * Closes the stream ignoring {@link IOException}. Must only be called in
+     * cleaning up from exception handlers.
+     * 
+     * @param stream
+     *            the Stream to close
+     */
+    public static void closeStream(Closeable stream) {
+        cleanup(null, stream);
+    }
+
+    /**
+     * Close the Closeable objects and <b>ignore</b> any {@link IOException} or
+     * null pointers. Must only be used for cleanup in exception handlers.
+     * 
+     * @param log
+     *            the log to record problems to at debug level. Can be null.
+     * @param closeables
+     *            the objects to close
+     */
+    public static void cleanup(Logger log, Closeable... closeables) {
+        for (Closeable c : closeables) {
+            if (c != null) {
+                try {
+                    c.close();
+                } catch (IOException e) {
+                    if (log != null) {
+                        log.warn("Exception in closing " + c, e);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Copies from one stream to another.
+     * 
+     * @param in
+     *            InputStrem to read from
+     * @param out
+     *            OutputStream to write to
+     * @param buffSize
+     *            the size of the buffer
+     * @param close
+     *            whether or not close the InputStream and OutputStream at the
+     *            end. The streams are closed in the finally clause.
+     */
+    public static void copyBytes(InputStream in, OutputStream out,
+            int buffSize, boolean close) throws IOException {
+        try {
+            copyBytes(in, out, buffSize);
+            if (close) {
+                out.close();
+                out = null;
+                in.close();
+                in = null;
+            }
+        } finally {
+            if (close) {
+                closeStream(out);
+                closeStream(in);
+            }
+        }
+    }
+
+    /**
+     * Copies from one stream to another.
+     * 
+     * @param in
+     *            InputStrem to read from
+     * @param out
+     *            OutputStream to write to
+     * @param buffSize
+     *            the size of the buffer
+     */
+    public static void copyBytes(InputStream in, OutputStream out, int buffSize)
+            throws IOException {
+        PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null;
+        byte buf[] = new byte[buffSize];
+        int bytesRead = in.read(buf);
+        while (bytesRead >= 0) {
+            out.write(buf, 0, bytesRead);
+            if ((ps != null) && ps.checkError()) {
+                throw new IOException("Unable to write to output stream.");
+            }
+            bytesRead = in.read(buf);
+        }
+    }
+
+}

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1362656&r1=1362655&r2=1362656&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Tue Jul 17 21:19:38 2012
@@ -21,7 +21,6 @@ import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
@@ -39,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.zookeeper.common.AtomicFileOutputStream;
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.jmx.ZKMBeanInfo;
 import org.apache.zookeeper.server.ServerCnxnFactory;
@@ -1283,16 +1283,36 @@ public class QuorumPeer extends Thread i
 
     public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch";
 
+	/**
+	 * Write a long value to disk atomically. Either succeeds or an exception
+	 * is thrown.
+	 * @param name file name to write the long to
+	 * @param value the long value to write to the named file
+	 * @throws IOException if the file cannot be written atomically
+	 */
     private void writeLongToFile(String name, long value) throws IOException {
         File file = new File(logFactory.getSnapDir(), name);
-        FileOutputStream out = new FileOutputStream(file);
+        AtomicFileOutputStream out = new AtomicFileOutputStream(file);
         BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out));
+        boolean aborted = false;
         try {
             bw.write(Long.toString(value));
             bw.flush();
-            out.getFD().sync();
+            
+            out.flush();
+        } catch (IOException e) {
+            LOG.error("Failed to write new file " + file, e);
+            // worst case here the tmp file/resources(fd) are not cleaned up
+            //   and the caller will be notified (IOException)
+            aborted = true;
+            out.abort();
+            throw e;
         } finally {
-            bw.close();
+            if (!aborted) {
+                // if the close operation (rename) fails we'll get notified.
+                // worst case the tmp file may still exist
+                out.close();
+            }
         }
     }
 

Modified: zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml?rev=1362656&r1=1362655&r2=1362656&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml (original)
+++ zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml Tue Jul 17 21:19:38 2012
@@ -125,4 +125,10 @@
     </Or>
   </Match>
 
+  <Match>
+    <Class name="org.apache.zookeeper.server.quorum.QuorumPeer"/>
+    <Bug pattern="OS_OPEN_STREAM" />
+    <Method name="writeLongToFile" />
+  </Match>
+
 </FindBugsFilter>

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AtomicFileOutputStreamTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AtomicFileOutputStreamTest.java?rev=1362656&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AtomicFileOutputStreamTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AtomicFileOutputStreamTest.java Tue Jul 17 21:19:38 2012
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.common.AtomicFileOutputStream;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AtomicFileOutputStreamTest extends ZKTestCase {
+    private static final String TEST_STRING = "hello world";
+    private static final String TEST_STRING_2 = "goodbye world";
+
+    private File testDir;
+    private File dstFile;
+
+    @Before
+    public void setupTestDir() throws IOException {
+        testDir = ClientBase.createTmpDir();
+        dstFile = new File(testDir, "test.txt");
+    }
+    @After
+    public void cleanupTestDir() throws IOException {
+        ClientBase.recursiveDelete(testDir);
+    }
+
+    /**
+     * Test case where there is no existing file
+     */
+    @Test
+    public void testWriteNewFile() throws IOException {
+        OutputStream fos = new AtomicFileOutputStream(dstFile);
+        assertFalse(dstFile.exists());
+        fos.write(TEST_STRING.getBytes());
+        fos.flush();
+        assertFalse(dstFile.exists());
+        fos.close();
+        assertTrue(dstFile.exists());
+
+        String readBackData = ClientBase.readFile(dstFile);
+        assertEquals(TEST_STRING, readBackData);
+    }
+
+    /**
+     * Test case where there is no existing file
+     */
+    @Test
+    public void testOverwriteFile() throws IOException {
+        assertTrue("Creating empty dst file", dstFile.createNewFile());
+
+        OutputStream fos = new AtomicFileOutputStream(dstFile);
+
+        assertTrue("Empty file still exists", dstFile.exists());
+        fos.write(TEST_STRING.getBytes());
+        fos.flush();
+
+        // Original contents still in place
+        assertEquals("", ClientBase.readFile(dstFile));
+
+        fos.close();
+
+        // New contents replace original file
+        String readBackData = ClientBase.readFile(dstFile);
+        assertEquals(TEST_STRING, readBackData);
+    }
+
+    /**
+     * Test case where the flush() fails at close time - make sure that we clean
+     * up after ourselves and don't touch any existing file at the destination
+     */
+    @Test
+    public void testFailToFlush() throws IOException {
+        // Create a file at destination
+        FileOutputStream fos = new FileOutputStream(dstFile);
+        fos.write(TEST_STRING_2.getBytes());
+        fos.close();
+
+        OutputStream failingStream = createFailingStream();
+        failingStream.write(TEST_STRING.getBytes());
+        try {
+            failingStream.close();
+            fail("Close didn't throw exception");
+        } catch (IOException ioe) {
+            // expected
+        }
+
+        // Should not have touched original file
+        assertEquals(TEST_STRING_2, ClientBase.readFile(dstFile));
+
+        assertEquals("Temporary file should have been cleaned up",
+                dstFile.getName(), ClientBase.join(",", testDir.list()));
+    }
+
+    /**
+     * Create a stream that fails to flush at close time
+     */
+    private OutputStream createFailingStream() throws FileNotFoundException {
+        return new AtomicFileOutputStream(dstFile) {
+            @Override
+            public void flush() throws IOException {
+                throw new IOException("injected failure");
+            }
+        };
+    }
+
+    /**
+     * Ensure the tmp file is cleaned up and dstFile is not created when
+     * aborting a new file.
+     */
+    @Test
+    public void testAbortNewFile() throws IOException {
+        AtomicFileOutputStream fos = new AtomicFileOutputStream(dstFile);
+
+        fos.abort();
+
+        assertEquals(0, testDir.list().length);
+    }
+
+    /**
+     * Ensure the tmp file is cleaned up and dstFile is not created when
+     * aborting a new file.
+     */
+    @Test
+    public void testAbortNewFileAfterFlush() throws IOException {
+        AtomicFileOutputStream fos = new AtomicFileOutputStream(dstFile);
+        fos.write(TEST_STRING.getBytes());
+        fos.flush();
+
+        fos.abort();
+
+        assertEquals(0, testDir.list().length);
+    }
+
+    /**
+     * Ensure the tmp file is cleaned up and dstFile is untouched when
+     * aborting an existing file overwrite.
+     */
+    @Test
+    public void testAbortExistingFile() throws IOException {
+        FileOutputStream fos1 = new FileOutputStream(dstFile);
+        fos1.write(TEST_STRING.getBytes());
+        fos1.close();
+
+        AtomicFileOutputStream fos2 = new AtomicFileOutputStream(dstFile);
+
+        fos2.abort();
+
+        // Should not have touched original file
+        assertEquals(TEST_STRING, ClientBase.readFile(dstFile));
+        assertEquals(1, testDir.list().length);
+    }
+
+    /**
+     * Ensure the tmp file is cleaned up and dstFile is untouched when
+     * aborting an existing file overwrite.
+     */
+    @Test
+    public void testAbortExistingFileAfterFlush() throws IOException {
+        FileOutputStream fos1 = new FileOutputStream(dstFile);
+        fos1.write(TEST_STRING.getBytes());
+        fos1.close();
+
+        AtomicFileOutputStream fos2 = new AtomicFileOutputStream(dstFile);
+        fos2.write(TEST_STRING_2.getBytes());
+        fos2.flush();
+
+        fos2.abort();
+
+        // Should not have touched original file
+        assertEquals(TEST_STRING, ClientBase.readFile(dstFile));
+        assertEquals(1, testDir.list().length);
+    }
+}

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=1362656&r1=1362655&r2=1362656&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Tue Jul 17 21:19:38 2012
@@ -20,7 +20,10 @@ package org.apache.zookeeper.test;
 
 import static org.apache.zookeeper.client.FourLetterWordMain.send4LetterWord;
 
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.OperatingSystemMXBean;
@@ -44,19 +47,19 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.IOUtils;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ServerCnxnFactoryAccessor;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnLog;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
-
-import static org.apache.zookeeper.client.FourLetterWordMain.send4LetterWord;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import com.sun.management.UnixOperatingSystemMXBean;
 
 public abstract class ClientBase extends ZKTestCase {
@@ -569,4 +572,28 @@ public abstract class ClientBase extends
             }
         }
     }
+
+    public static String readFile(File file) throws IOException {
+        ByteArrayOutputStream os = new ByteArrayOutputStream();
+        BufferedInputStream is = new BufferedInputStream(new FileInputStream(file));
+        try {
+            IOUtils.copyBytes(is, os, 1024, true);
+        } finally {
+            is.close();
+        }
+        return os.toString();
+    }
+
+    public static String join(String separator, Object[] parts) {
+        StringBuilder sb = new StringBuilder();
+        boolean first = true;
+        for (Object part : parts) {
+            if (!first) {
+                sb.append(separator);
+                first = false;
+            }
+            sb.append(part);
+        }
+        return sb.toString();
+    }
 }