You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by gg...@apache.org on 2019/08/19 22:07:25 UTC

[commons-vfs] 02/02: - [VFS-726] getInputStream(int bufferSize) on SftpFileObject effectively ignores buffer size. - [VFS-704] Some providers wrap their input/output streams twice in a BufferedInputStream.

This is an automated email from the ASF dual-hosted git repository.

ggregory pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-vfs.git

commit 9fb261e7080db1516c5882c878b5acf07ab8643e
Author: Gary Gregory <ga...@gmail.com>
AuthorDate: Mon Aug 19 15:07:18 2019 -0700

    - [VFS-726] getInputStream(int bufferSize) on SftpFileObject effectively
    ignores buffer size.
    - [VFS-704] Some providers wrap their input/output streams twice in a
    BufferedInputStream.
---
 .../commons/vfs2/provider/DefaultFileContent.java  |  76 ++++++++-
 .../vfs2/provider/FileContentThreadData.java       | 190 +++++++++++----------
 .../commons/vfs2/util/RawMonitorInputStream.java   | 175 +++++++++++++++++++
 src/changes/changes.xml                            |   3 +
 4 files changed, 344 insertions(+), 100 deletions(-)

diff --git a/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/DefaultFileContent.java b/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/DefaultFileContent.java
index 3e1f184..d1f3508 100644
--- a/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/DefaultFileContent.java
+++ b/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/DefaultFileContent.java
@@ -16,6 +16,7 @@
  */
 package org.apache.commons.vfs2.provider;
 
+import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -34,6 +35,7 @@ import org.apache.commons.vfs2.util.MonitorInputStream;
 import org.apache.commons.vfs2.util.MonitorOutputStream;
 import org.apache.commons.vfs2.util.MonitorRandomAccessContent;
 import org.apache.commons.vfs2.util.RandomAccessMode;
+import org.apache.commons.vfs2.util.RawMonitorInputStream;
 
 /**
  * The content of a file.
@@ -444,10 +446,15 @@ public final class DefaultFileContent implements FileContent {
 
             // Close the input stream
             while (fileContentThreadData.getInstrsSize() > 0) {
-                final FileContentInputStream inputStream = (FileContentInputStream) fileContentThreadData
-                        .removeInstr(0);
+                final InputStream inputStream = fileContentThreadData.removeInputStream(0);
                 try {
-                    inputStream.close();
+                    if (inputStream instanceof FileContentInputStream) {
+                        ((FileContentInputStream) inputStream).close();
+                    } else if (inputStream instanceof RawFileContentInputStream) {
+                        ((RawFileContentInputStream) inputStream).close();
+                    } else {
+                       caught = new FileSystemException("Unsupported InputStream type: " + inputStream);
+                    }
                 } catch (final FileSystemException ex) {
                     caught = ex;
 
@@ -490,14 +497,29 @@ public final class DefaultFileContent implements FileContent {
          * if (getThreadData().getState() == STATE_WRITING || getThreadData().getState() == STATE_RANDOM_ACCESS) { throw
          * new FileSystemException("vfs.provider/read-in-use.error", file); }
          */
-
         // Get the raw input stream
-        final InputStream inputStream = bufferSize == 0 ? fileObject.getInputStream()
+        // @formatter:off
+        final InputStream inputStream = bufferSize == 0 
+                ? fileObject.getInputStream()
                 : fileObject.getInputStream(bufferSize);
+        // @formatter:on
         // Double buffering may take place here.
-        final InputStream wrappedInputStream = bufferSize == 0 
+//        final InputStream wrappedInputStream = bufferSize == 0 
+//                    ? new FileContentInputStream(fileObject, inputStream)
+//                    : new FileContentInputStream(fileObject, inputStream, bufferSize);
+
+        InputStream wrappedInputStream;
+        if (inputStream instanceof BufferedInputStream) {
+            // Don't double buffer.
+            wrappedInputStream = new RawFileContentInputStream(fileObject, inputStream);
+        } else 
+        {
+            // @formatter:off
+            wrappedInputStream = bufferSize == 0 
                     ? new FileContentInputStream(fileObject, inputStream)
                     : new FileContentInputStream(fileObject, inputStream, bufferSize);
+            // @formatter:on
+        }
         getOrCreateThreadData().addInstr(wrappedInputStream);
         streamOpened();
 
@@ -530,7 +552,7 @@ public final class DefaultFileContent implements FileContent {
     /**
      * Handles the end of input stream.
      */
-    private void endInput(final FileContentInputStream instr) {
+    private void endInput(final InputStream instr) {
         final FileContentThreadData fileContentThreadData = threadLocal.get();
         if (fileContentThreadData != null) {
             fileContentThreadData.removeInstr(instr);
@@ -646,6 +668,46 @@ public final class DefaultFileContent implements FileContent {
     }
 
     /**
+     * An input stream for reading content. Provides buffering, and end-of-stream monitoring.
+     * <p>
+     * This is the same as {@link FileContentInputStream} but without the buffering.
+     * </p>
+     */
+    private final class RawFileContentInputStream extends RawMonitorInputStream {
+        // avoid gc
+        private final FileObject file;
+
+        RawFileContentInputStream(final FileObject file, final InputStream instr) {
+            super(instr);
+            this.file = file;
+        }
+
+        /**
+         * Closes this input stream.
+         */
+        @Override
+        public void close() throws FileSystemException {
+            try {
+                super.close();
+            } catch (final IOException e) {
+                throw new FileSystemException("vfs.provider/close-instr.error", file, e);
+            }
+        }
+
+        /**
+         * Called after the stream has been closed.
+         */
+        @Override
+        protected void onClose() throws IOException {
+            try {
+                super.onClose();
+            } finally {
+                endInput(this);
+            }
+        }
+    }
+
+    /**
      * An input/output stream for reading/writing content on random positions
      */
     private final class FileRandomAccessContent extends MonitorRandomAccessContent {
diff --git a/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/FileContentThreadData.java b/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/FileContentThreadData.java
index d736da5..2f08b0f 100644
--- a/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/FileContentThreadData.java
+++ b/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/FileContentThreadData.java
@@ -1,93 +1,97 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.commons.vfs2.provider;
-
-import java.io.InputStream;
-import java.util.ArrayList;
-
-import org.apache.commons.vfs2.FileSystemException;
-import org.apache.commons.vfs2.RandomAccessContent;
-
-/**
- * Holds the data which needs to be local to the current thread
- */
-class FileContentThreadData {
-
-    // private int state = DefaultFileContent.STATE_CLOSED;
-
-    private final ArrayList<InputStream> inputStreamList = new ArrayList<>();
-    private final ArrayList<RandomAccessContent> randomAccessContentList = new ArrayList<>();
-    private DefaultFileContent.FileContentOutputStream outputStream;
-
-    FileContentThreadData() {
-    }
-
-    /*
-     * int getState() { return state; }
-     *
-     * void setState(int state) { this.state = state; }
-     */
-
-    void addInstr(final InputStream inputStream) {
-        this.inputStreamList.add(inputStream);
-    }
-
-    void setOutstr(final DefaultFileContent.FileContentOutputStream outputStream) {
-        this.outputStream = outputStream;
-    }
-
-    DefaultFileContent.FileContentOutputStream getOutstr() {
-        return this.outputStream;
-    }
-
-    void addRastr(final RandomAccessContent randomAccessContent) {
-        this.randomAccessContentList.add(randomAccessContent);
-    }
-
-    int getInstrsSize() {
-        return this.inputStreamList.size();
-    }
-
-    public Object removeInstr(final int pos) {
-        return this.inputStreamList.remove(pos);
-    }
-
-    public void removeInstr(final InputStream inputStream) {
-        this.inputStreamList.remove(inputStream);
-    }
-
-    public Object removeRastr(final int pos) {
-        return this.randomAccessContentList.remove(pos);
-    }
-
-    public void removeRastr(final RandomAccessContent randomAccessContent) {
-        this.randomAccessContentList.remove(randomAccessContent);
-    }
-
-    public boolean hasStreams() {
-        return inputStreamList.size() > 0 || outputStream != null || randomAccessContentList.size() > 0;
-    }
-
-    public void closeOutstr() throws FileSystemException {
-        outputStream.close();
-        outputStream = null;
-    }
-
-    int getRastrsSize() {
-        return randomAccessContentList.size();
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.vfs2.provider;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+
+import org.apache.commons.vfs2.FileSystemException;
+import org.apache.commons.vfs2.RandomAccessContent;
+
+/**
+ * Holds the data which needs to be local to the current thread
+ */
+class FileContentThreadData {
+
+    // private int state = DefaultFileContent.STATE_CLOSED;
+
+    private final ArrayList<InputStream> inputStreamList = new ArrayList<>();
+    private final ArrayList<RandomAccessContent> randomAccessContentList = new ArrayList<>();
+    private DefaultFileContent.FileContentOutputStream outputStream;
+
+    FileContentThreadData() {
+    }
+
+    /*
+     * int getState() { return state; }
+     *
+     * void setState(int state) { this.state = state; }
+     */
+
+    void addInstr(final InputStream inputStream) {
+        this.inputStreamList.add(inputStream);
+    }
+
+    void setOutstr(final DefaultFileContent.FileContentOutputStream outputStream) {
+        this.outputStream = outputStream;
+    }
+
+    DefaultFileContent.FileContentOutputStream getOutstr() {
+        return this.outputStream;
+    }
+
+    void addRastr(final RandomAccessContent randomAccessContent) {
+        this.randomAccessContentList.add(randomAccessContent);
+    }
+
+    int getInstrsSize() {
+        return this.inputStreamList.size();
+    }
+
+    public Object removeInstr(final int pos) {
+        return this.inputStreamList.remove(pos);
+    }
+
+    InputStream removeInputStream(final int pos) {
+        return this.inputStreamList.remove(pos);
+    }
+
+    public void removeInstr(final InputStream inputStream) {
+        this.inputStreamList.remove(inputStream);
+    }
+
+    public Object removeRastr(final int pos) {
+        return this.randomAccessContentList.remove(pos);
+    }
+
+    public void removeRastr(final RandomAccessContent randomAccessContent) {
+        this.randomAccessContentList.remove(randomAccessContent);
+    }
+
+    public boolean hasStreams() {
+        return inputStreamList.size() > 0 || outputStream != null || randomAccessContentList.size() > 0;
+    }
+
+    public void closeOutstr() throws FileSystemException {
+        outputStream.close();
+        outputStream = null;
+    }
+
+    int getRastrsSize() {
+        return randomAccessContentList.size();
+    }
+}
diff --git a/commons-vfs2/src/main/java/org/apache/commons/vfs2/util/RawMonitorInputStream.java b/commons-vfs2/src/main/java/org/apache/commons/vfs2/util/RawMonitorInputStream.java
new file mode 100644
index 0000000..42b682a
--- /dev/null
+++ b/commons-vfs2/src/main/java/org/apache/commons/vfs2/util/RawMonitorInputStream.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.vfs2.util;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An InputStream that provides end-of-stream monitoring.
+ * <p>
+ * This is the same as {@link MonitorInputStream} but without the buffering.
+ * </p>
+ * 
+ * @since 2.5
+ */
+public class RawMonitorInputStream extends FilterInputStream {
+
+    private static final int EOF_CHAR = -1;
+    private final AtomicBoolean finished = new AtomicBoolean(false);
+    private final AtomicLong atomicCount = new AtomicLong(0);
+
+//    @Override
+//    public synchronized void reset() throws IOException {
+//        if (!finished.get()) {
+//            super.reset();
+//        }
+//    }
+//
+//    @Override
+//    public synchronized long skip(long n) throws IOException {
+//        if (finished.get()) {
+//            return 0;
+//        }
+//        return super.skip(n);
+//    }
+
+    /**
+     * Constructs a MonitorInputStream from the passed InputStream
+     *
+     * @param inputStream The input stream to wrap.
+     */
+    public RawMonitorInputStream(final InputStream inputStream) {
+        super(inputStream);
+    }
+
+    /**
+     * Returns 0 if the stream is at EOF, else the underlying inputStream will be queried.
+     *
+     * @return The number of bytes that are available.
+     * @throws IOException if an error occurs.
+     */
+    @Override
+    public synchronized int available() throws IOException {
+        if (finished.get()) {
+            return 0;
+        }
+
+        return super.available();
+    }
+
+    /**
+     * Reads a character.
+     *
+     * @return The character that was read as an integer.
+     * @throws IOException if an error occurs.
+     */
+    @Override
+    public int read() throws IOException { // lgtm [java/non-sync-override]
+        if (finished.get()) {
+            return EOF_CHAR;
+        }
+
+        final int ch = super.read();
+        if (ch != EOF_CHAR) {
+            atomicCount.incrementAndGet();
+        }
+
+        return ch;
+    }
+
+    /**
+     * Reads bytes from this input stream.
+     *
+     * @param buffer A byte array in which to place the characters read.
+     * @param offset The offset at which to start reading.
+     * @param length The maximum number of bytes to read.
+     * @return The number of bytes read.
+     * @throws IOException if an error occurs.
+     */
+    @Override
+    public int read(final byte[] buffer, final int offset, final int length) throws IOException { // lgtm [java/non-sync-override]
+        if (finished.get()) {
+            return EOF_CHAR;
+        }
+
+        final int nread = super.read(buffer, offset, length);
+        if (nread != EOF_CHAR) {
+            atomicCount.addAndGet(nread);
+        }
+        return nread;
+    }
+
+    /**
+     * Closes this input stream and releases any system resources associated with the stream.
+     *
+     * @throws IOException if an error occurs.
+     */
+    @Override
+    public void close() throws IOException {
+        final boolean closed = finished.getAndSet(true);
+        if (closed) {
+            return;
+        }
+
+        // Close the stream
+        IOException exc = null;
+        try {
+            super.close();
+        } catch (final IOException ioe) {
+            exc = ioe;
+        }
+
+        // Notify that the stream has been closed
+        try {
+            onClose();
+        } catch (final IOException ioe) {
+            exc = ioe;
+        }
+
+        if (exc != null) {
+            throw exc;
+        }
+    }
+
+    /**
+     * Called after the stream has been closed. This implementation does nothing.
+     *
+     * @throws IOException if an error occurs.
+     */
+    protected void onClose() throws IOException {
+        // noop
+    }
+
+    /**
+     * Gets the number of bytes read by this input stream.
+     *
+     * @return The number of bytes read by this input stream.
+     */
+    public long getCount() {
+        return atomicCount.get();
+    }
+
+    @Override
+    public synchronized void mark(int readlimit) {
+        // TODO Auto-generated method stub
+        super.mark(readlimit);
+    }
+}
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 5cfa369..91d6614 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -53,6 +53,9 @@ The <action> type attribute can be add,update,fix,remove.
       <action issue="VFS-726" dev="ggregory" type="fix" due-to="Cornelius Höfig, Gary Gregory">
         getInputStream(int bufferSize) on SftpFileObject effectively ignores buffer size.
       </action>
+      <action issue="VFS-704" dev="ggregory" type="fix" due-to="Boris Petrov, Gary Gregory">
+        Some providers wrap their input/output streams twice in a BufferedInputStream.
+      </action>
     </release>
     <release version="2.4.1" date="2019-08-10" description="Bug fix release.">
       <action issue="VFS-725" dev="ggregory" type="fix" due-to="Gary Gregory">