You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by un...@apache.org on 2014/09/19 11:18:48 UTC

svn commit: r1626154 - in /jackrabbit/branches/2.8: jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/ jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/db/ jackr...

Author: unico
Date: Fri Sep 19 09:18:48 2014
New Revision: 1626154

URL: http://svn.apache.org/r1626154
Log:
JCR-3811 
 - Add a new ResettableTempFileInputStream that is capable of being reset to the beginning of the stream in order to allow re-reading 
 - Simplify TempFileInputStream to only be responsible for removing the temporary file when it is closed 
  

Added:
    jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/db/ResettableTempFileInputStream.java
      - copied unchanged from r1625518, jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/db/ResettableTempFileInputStream.java
    jackrabbit/branches/2.8/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/db/
      - copied from r1625518, jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/db/
Removed:
    jackrabbit/branches/2.8/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/TempFileInputStreamTest.java
Modified:
    jackrabbit/branches/2.8/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java
    jackrabbit/branches/2.8/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/TestAll.java
    jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java
    jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/db/TempFileInputStream.java
    jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/util/db/ConnectionHelper.java
    jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/util/db/StreamWrapper.java

Modified: jackrabbit/branches/2.8/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/branches/2.8/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java?rev=1626154&r1=1626153&r2=1626154&view=diff
==============================================================================
--- jackrabbit/branches/2.8/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java (original)
+++ jackrabbit/branches/2.8/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java Fri Sep 19 09:18:48 2014
@@ -16,10 +16,7 @@
  */
 package org.apache.jackrabbit.core.journal;
 
-import org.apache.jackrabbit.core.data.db.TempFileInputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
@@ -28,7 +25,10 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.BufferedOutputStream;
+
+import org.apache.jackrabbit.core.data.db.ResettableTempFileInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default temporary record used for appending to some journal.
@@ -298,7 +298,7 @@ public class AppendRecord extends Abstra
     private InputStream openInput() throws JournalException {
         if (file != null) {
             try {
-                return new TempFileInputStream(file, true);
+                return new ResettableTempFileInputStream(file);
             } catch (IOException e) {
                 String msg = "Unable to open file input on: " + file.getPath();
                 throw new JournalException(msg, e);

Modified: jackrabbit/branches/2.8/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/TestAll.java
URL: http://svn.apache.org/viewvc/jackrabbit/branches/2.8/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/TestAll.java?rev=1626154&r1=1626153&r2=1626154&view=diff
==============================================================================
--- jackrabbit/branches/2.8/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/TestAll.java (original)
+++ jackrabbit/branches/2.8/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/TestAll.java Fri Sep 19 09:18:48 2014
@@ -48,7 +48,6 @@ public class TestAll extends TestCase {
         suite.addTestSuite(NodeTypeTest.class);
         suite.addTestSuite(OpenFilesTest.class);
         suite.addTestSuite(PersistenceManagerIteratorTest.class);
-        suite.addTestSuite(TempFileInputStreamTest.class);
         suite.addTestSuite(TestTwoGetStreams.class);
         suite.addTestSuite(WriteWhileReadingTest.class);
         suite.addTestSuite(GCSubtreeMoveTest.class);

Modified: jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java?rev=1626154&r1=1626153&r2=1626154&view=diff
==============================================================================
--- jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java (original)
+++ jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java Fri Sep 19 09:18:48 2014
@@ -16,6 +16,7 @@
  */
 package org.apache.jackrabbit.core.data.db;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.input.CountingInputStream;
 import org.apache.jackrabbit.core.data.AbstractDataStore;
 import org.apache.jackrabbit.core.data.DataIdentifier;
@@ -35,8 +36,10 @@ import org.slf4j.LoggerFactory;
 import java.io.BufferedInputStream;
 import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.lang.ref.WeakReference;
 import java.security.DigestInputStream;
 import java.security.MessageDigest;
@@ -349,7 +352,7 @@ public class DbDataStore extends Abstrac
             } else if (STORE_TEMP_FILE.equals(storeStream)) {
                 File temp = moveToTempFile(in);
                 long length = temp.length();
-                wrapper = new StreamWrapper(new TempFileInputStream(temp, true), length);
+                wrapper = new StreamWrapper(new ResettableTempFileInputStream(temp), length);
             } else {
                 throw new DataStoreException("Unsupported stream store algorithm: " + storeStream);
             }
@@ -471,10 +474,20 @@ public class DbDataStore extends Abstrac
      */
     private File moveToTempFile(InputStream in) throws IOException {
         File temp = File.createTempFile("dbRecord", null);
-        TempFileInputStream.writeToFileAndClose(in, temp);
+        writeToFileAndClose(in, temp);
         return temp;
     }
 
+    private void writeToFileAndClose(InputStream in, File file) throws IOException {
+        OutputStream out = new FileOutputStream(file);
+        try {
+            IOUtils.copy(in, out);
+        } finally {
+            IOUtils.closeQuietly(out);
+            IOUtils.closeQuietly(in);
+        }
+    }
+
     public synchronized void deleteRecord(DataIdentifier identifier) throws DataStoreException {
         try {
             conHelper.exec(deleteSQL, identifier.toString());
@@ -586,7 +599,7 @@ public class DbDataStore extends Abstrac
             } else if (copyWhenReading) {
                 // If we copy while reading, create a temp file and close the stream
                 File temp = moveToTempFile(stream);
-                stream = new BufferedInputStream(new TempFileInputStream(temp, false));
+                stream = new BufferedInputStream(new TempFileInputStream(temp));
                 DbUtility.close(rs);
             } else {
                 stream = new BufferedInputStream(stream);

Modified: jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/db/TempFileInputStream.java
URL: http://svn.apache.org/viewvc/jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/db/TempFileInputStream.java?rev=1626154&r1=1626153&r2=1626154&view=diff
==============================================================================
--- jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/db/TempFileInputStream.java (original)
+++ jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/db/TempFileInputStream.java Fri Sep 19 09:18:48 2014
@@ -16,133 +16,42 @@
  */
 package org.apache.jackrabbit.core.data.db;
 
-import java.io.BufferedInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
+import java.io.FilterInputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.input.AutoCloseInputStream;
+import org.apache.commons.io.input.ClosedInputStream;
 
 /**
  * An input stream from a temporary file. The file is deleted when the stream is
- * closed, fully read, or garbage collected.
- * <p>
- * This class does not support mark/reset. It is always to be wrapped
- * using a BufferedInputStream.
+ * closed or garbage collected.
  */
-public class TempFileInputStream extends AutoCloseInputStream {
+public class TempFileInputStream extends FilterInputStream {
 
     private final File file;
-    private boolean closed;
-    private boolean delayedResourceCleanup = true;
 
-    /**
-     * Copy the data to a file and close the input stream afterwards.
-     *
-     * @param in the input stream
-     * @param file the target file
-     * @return the size of the file
-     */
-    public static long writeToFileAndClose(InputStream in, File file) throws IOException {
-        OutputStream out = new FileOutputStream(file);
-        IOUtils.copy(in, out);
-        out.close();
-        in.close();
-        return file.length();
+    public TempFileInputStream(File file) throws FileNotFoundException {
+        this(new FileInputStream(file), file);
     }
 
-    /**
-     * Construct a new temporary file input stream.
-     * The file is deleted if the input stream is closed or fully read and 
-     * delayedResourceCleanup was set to true. Otherwise you must call {@link #deleteFile()}.
-     * Deleting is only attempted once.
-     *
-     * @param file the temporary file
-     * @param delayedResourceCleanup
-     */
-    public TempFileInputStream(File file, boolean delayedResourceCleanup) throws FileNotFoundException {
-        super(new BufferedInputStream(new FileInputStream(file)));
+    protected TempFileInputStream(FileInputStream in, File file) {
+        super(in);
         this.file = file;
-        this.delayedResourceCleanup = delayedResourceCleanup;
-    }
-
-    public File getFile() {
-    	return file;
-    }
-    
-    public void deleteFile() {
-	    file.delete();
-	}
-
-	private int closeIfEOF(int read) throws IOException {
-        if (read < 0) {
-            close();
-        }
-        return read;
     }
 
+    @Override
     public void close() throws IOException {
-        if (!closed) {
-            in.close();
-            if (!delayedResourceCleanup) {
-            	deleteFile();
-            }
-            closed = true;
-        }
-    }
-
-    public int available() throws IOException {
-        return in.available();
-    }
-
-    /**
-     * This method does nothing.
-     */
-    public void mark(int readlimit) {
-        // do nothing
-    }
-
-    /**
-     * Check whether mark and reset are supported.
-     *
-     * @return false
-     */
-    public boolean markSupported() {
-        return false;
-    }
-
-    public long skip(long n) throws IOException {
-        return in.skip(n);
-    }
-
-    public void reset() throws IOException {
-        in.reset();
-    }
-
-    public int read(byte[] b, int off, int len) throws IOException {
-        if (closed) {
-            return -1;
-        }
-        return closeIfEOF(in.read(b, off, len));
-    }
-
-    public int read(byte[] b) throws IOException {
-        if (closed) {
-            return -1;
-        }
-        return closeIfEOF(in.read(b));
+        in.close();
+        in = new ClosedInputStream();
+        file.delete();
     }
 
-    public int read() throws IOException {
-        if (closed) {
-            return -1;
-        }
-        return closeIfEOF(in.read());
+    @Override
+    protected void finalize() throws Throwable {
+        close();
+        super.finalize();
     }
 
 }

Modified: jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/util/db/ConnectionHelper.java
URL: http://svn.apache.org/viewvc/jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/util/db/ConnectionHelper.java?rev=1626154&r1=1626153&r2=1626154&view=diff
==============================================================================
--- jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/util/db/ConnectionHelper.java (original)
+++ jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/util/db/ConnectionHelper.java Fri Sep 19 09:18:48 2014
@@ -547,7 +547,8 @@ public class ConnectionHelper {
                         log.error("Failed to execute SQL (stacktrace on DEBUG log level): " + lastException);
                         log.debug("Failed to execute SQL", lastException);
                         if (!resetParamResources()) {
-                            break; // don't try again if streams cannot be reset
+                            log.warn("Could not reset parameters: not retrying SQL call");
+                            break;
                         }
                         failures++;
                         if (blockOnConnectionLoss || failures <= RETRIES) { // if we're going to try again
@@ -571,15 +572,13 @@ public class ConnectionHelper {
 
 		/**
 		 * Cleans up the Parameter resources that are not automatically closed or deleted.
-		 *
-		 * @param params
 		 */
 		protected void cleanupParamResources() {
 		    for (int i = 0; params != null && i < params.length; i++) {
 		        Object p = params[i];
 		        if (p instanceof StreamWrapper) {
 		            StreamWrapper wrapper = (StreamWrapper) p;
-		            wrapper.cleanupResources();
+		            wrapper.closeStream();
 		        }
 		    }
 		}

Modified: jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/util/db/StreamWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/util/db/StreamWrapper.java?rev=1626154&r1=1626153&r2=1626154&view=diff
==============================================================================
--- jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/util/db/StreamWrapper.java (original)
+++ jackrabbit/branches/2.8/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/util/db/StreamWrapper.java Fri Sep 19 09:18:48 2014
@@ -16,20 +16,20 @@
  */
 package org.apache.jackrabbit.core.util.db;
 
-import java.io.BufferedInputStream;
+import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.sql.SQLException;
 
-import org.apache.jackrabbit.core.data.db.TempFileInputStream;
+import org.apache.commons.io.input.CloseShieldInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StreamWrapper {
 
-    static Logger log = LoggerFactory.getLogger(StreamWrapper.class);
+    private final Logger log = LoggerFactory.getLogger(StreamWrapper.class);
 
-    private InputStream stream;
+    private MarkDetectingInputStream stream;
     private final long size;
 
     /**
@@ -37,65 +37,69 @@ public class StreamWrapper {
      * safely be passed as a parameter to the {@link ConnectionHelper#exec(String, Object...)},
      * {@link ConnectionHelper#exec(String, Object[], boolean, int)} and
      * {@link ConnectionHelper#update(String, Object[])} methods.
-     * If the wrapped Stream is a {@link TempFileInputStream} it will be wrapped again by a {@link BufferedInputStream}.
-     * 
+     *
      * @param in the InputStream to wrap
      * @param size the size of the input stream
      */
     public StreamWrapper(InputStream in, long size) {
-        this.stream = in;
+        this.stream = new MarkDetectingInputStream(in);
         this.size = size;
     }
     
     public InputStream getStream() {
-        if (stream instanceof TempFileInputStream) {
-            return new BufferedInputStream(stream);
-        }
-        return stream;
+        return new CloseShieldInputStream(stream);
     }
     
     public long getSize() {
         return size;
     }
 
-    /**
-     * Cleans up the internal Resources
-     */
-	public void cleanupResources() {
-        if (stream instanceof TempFileInputStream) {
-        	try {
-        		stream.close();
-        		((TempFileInputStream) stream).deleteFile();
-        	} catch (IOException e) {
-        		log.warn("Unable to cleanup the TempFileInputStream");
-        	}
+    public void closeStream() {
+        try {
+            stream.close();
+        } catch (IOException e) {
+            log.error("Error while closing stream", e);
         }
-	}
+    }
 
     /**
      * Resets the internal InputStream that it could be re-read.<br>
      * Is used from {@link RetryManager} if a {@link SQLException} has occurred.<br>
+     * It relies on the assumption that the InputStream was not marked anywhere
+     * during reading.
      *
      * @return returns true if it was able to reset the Stream
      */
     public boolean resetStream() {
-    	if (stream instanceof TempFileInputStream) {
-    		try {
-	    		TempFileInputStream tempFileInputStream = (TempFileInputStream) stream;
-	    		// Close it if it is not already closed ...
-	    		tempFileInputStream.close();
-    			stream = new TempFileInputStream(tempFileInputStream.getFile(), true);
-    			return true;
-    		} catch (Exception e) {
-    			log.warn("Failed to create a new TempFileInputStream", e);
-    		}
-            return false;
-    	}
         try {
-            stream.reset();
-            return true;
+            if (!stream.isMarked()) {
+                stream.reset();
+                return true;
+            } else {
+                log.warn("Cannot reset stream to the beginning because it was marked.");
+                return false;
+            }
         } catch (IOException e) {
             return false;
         }
 	}
+
+    private static class MarkDetectingInputStream extends FilterInputStream {
+
+        private boolean marked;
+
+        protected MarkDetectingInputStream(final InputStream in) {
+            super(in);
+        }
+
+        @Override
+        public synchronized void mark(final int readlimit) {
+            super.mark(readlimit);
+            marked = true;
+        }
+
+        private boolean isMarked() {
+            return marked;
+        }
+    }
 }