You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by gg...@apache.org on 2019/08/19 22:07:25 UTC
[commons-vfs] 02/02: - [VFS-726] getInputStream(int bufferSize) on
SftpFileObject effectively ignores buffer size. - [VFS-704] Some providers
wrap their input/output streams twice in a BufferedInputStream.
This is an automated email from the ASF dual-hosted git repository.
ggregory pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-vfs.git
commit 9fb261e7080db1516c5882c878b5acf07ab8643e
Author: Gary Gregory <ga...@gmail.com>
AuthorDate: Mon Aug 19 15:07:18 2019 -0700
- [VFS-726] getInputStream(int bufferSize) on SftpFileObject effectively
ignores buffer size.
- [VFS-704] Some providers wrap their input/output streams twice in a
BufferedInputStream.
---
.../commons/vfs2/provider/DefaultFileContent.java | 76 ++++++++-
.../vfs2/provider/FileContentThreadData.java | 190 +++++++++++----------
.../commons/vfs2/util/RawMonitorInputStream.java | 175 +++++++++++++++++++
src/changes/changes.xml | 3 +
4 files changed, 344 insertions(+), 100 deletions(-)
diff --git a/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/DefaultFileContent.java b/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/DefaultFileContent.java
index 3e1f184..d1f3508 100644
--- a/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/DefaultFileContent.java
+++ b/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/DefaultFileContent.java
@@ -16,6 +16,7 @@
*/
package org.apache.commons.vfs2.provider;
+import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -34,6 +35,7 @@ import org.apache.commons.vfs2.util.MonitorInputStream;
import org.apache.commons.vfs2.util.MonitorOutputStream;
import org.apache.commons.vfs2.util.MonitorRandomAccessContent;
import org.apache.commons.vfs2.util.RandomAccessMode;
+import org.apache.commons.vfs2.util.RawMonitorInputStream;
/**
* The content of a file.
@@ -444,10 +446,15 @@ public final class DefaultFileContent implements FileContent {
// Close the input stream
while (fileContentThreadData.getInstrsSize() > 0) {
- final FileContentInputStream inputStream = (FileContentInputStream) fileContentThreadData
- .removeInstr(0);
+ final InputStream inputStream = fileContentThreadData.removeInputStream(0);
try {
- inputStream.close();
+ if (inputStream instanceof FileContentInputStream) {
+ ((FileContentInputStream) inputStream).close();
+ } else if (inputStream instanceof RawFileContentInputStream) {
+ ((RawFileContentInputStream) inputStream).close();
+ } else {
+ caught = new FileSystemException("Unsupported InputStream type: " + inputStream);
+ }
} catch (final FileSystemException ex) {
caught = ex;
@@ -490,14 +497,29 @@ public final class DefaultFileContent implements FileContent {
* if (getThreadData().getState() == STATE_WRITING || getThreadData().getState() == STATE_RANDOM_ACCESS) { throw
* new FileSystemException("vfs.provider/read-in-use.error", file); }
*/
-
// Get the raw input stream
- final InputStream inputStream = bufferSize == 0 ? fileObject.getInputStream()
+ // @formatter:off
+ final InputStream inputStream = bufferSize == 0
+ ? fileObject.getInputStream()
: fileObject.getInputStream(bufferSize);
+ // @formatter:on
// Double buffering may take place here.
- final InputStream wrappedInputStream = bufferSize == 0
+// final InputStream wrappedInputStream = bufferSize == 0
+// ? new FileContentInputStream(fileObject, inputStream)
+// : new FileContentInputStream(fileObject, inputStream, bufferSize);
+
+ InputStream wrappedInputStream;
+ if (inputStream instanceof BufferedInputStream) {
+ // Don't double buffer.
+ wrappedInputStream = new RawFileContentInputStream(fileObject, inputStream);
+ } else
+ {
+ // @formatter:off
+ wrappedInputStream = bufferSize == 0
? new FileContentInputStream(fileObject, inputStream)
: new FileContentInputStream(fileObject, inputStream, bufferSize);
+ // @formatter:on
+ }
getOrCreateThreadData().addInstr(wrappedInputStream);
streamOpened();
@@ -530,7 +552,7 @@ public final class DefaultFileContent implements FileContent {
/**
* Handles the end of input stream.
*/
- private void endInput(final FileContentInputStream instr) {
+ private void endInput(final InputStream instr) {
final FileContentThreadData fileContentThreadData = threadLocal.get();
if (fileContentThreadData != null) {
fileContentThreadData.removeInstr(instr);
@@ -646,6 +668,46 @@ public final class DefaultFileContent implements FileContent {
}
/**
+ * An input stream for reading content. Provides buffering, and end-of-stream monitoring.
+ * <p>
+ * This is the same as {@link FileContentInputStream} but without the buffering.
+ * </p>
+ */
+ private final class RawFileContentInputStream extends RawMonitorInputStream {
+ // avoid gc
+ private final FileObject file;
+
+ RawFileContentInputStream(final FileObject file, final InputStream instr) {
+ super(instr);
+ this.file = file;
+ }
+
+ /**
+ * Closes this input stream.
+ */
+ @Override
+ public void close() throws FileSystemException {
+ try {
+ super.close();
+ } catch (final IOException e) {
+ throw new FileSystemException("vfs.provider/close-instr.error", file, e);
+ }
+ }
+
+ /**
+ * Called after the stream has been closed.
+ */
+ @Override
+ protected void onClose() throws IOException {
+ try {
+ super.onClose();
+ } finally {
+ endInput(this);
+ }
+ }
+ }
+
+ /**
* An input/output stream for reading/writing content on random positions
*/
private final class FileRandomAccessContent extends MonitorRandomAccessContent {
diff --git a/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/FileContentThreadData.java b/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/FileContentThreadData.java
index d736da5..2f08b0f 100644
--- a/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/FileContentThreadData.java
+++ b/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/FileContentThreadData.java
@@ -1,93 +1,97 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.commons.vfs2.provider;
-
-import java.io.InputStream;
-import java.util.ArrayList;
-
-import org.apache.commons.vfs2.FileSystemException;
-import org.apache.commons.vfs2.RandomAccessContent;
-
-/**
- * Holds the data which needs to be local to the current thread
- */
-class FileContentThreadData {
-
- // private int state = DefaultFileContent.STATE_CLOSED;
-
- private final ArrayList<InputStream> inputStreamList = new ArrayList<>();
- private final ArrayList<RandomAccessContent> randomAccessContentList = new ArrayList<>();
- private DefaultFileContent.FileContentOutputStream outputStream;
-
- FileContentThreadData() {
- }
-
- /*
- * int getState() { return state; }
- *
- * void setState(int state) { this.state = state; }
- */
-
- void addInstr(final InputStream inputStream) {
- this.inputStreamList.add(inputStream);
- }
-
- void setOutstr(final DefaultFileContent.FileContentOutputStream outputStream) {
- this.outputStream = outputStream;
- }
-
- DefaultFileContent.FileContentOutputStream getOutstr() {
- return this.outputStream;
- }
-
- void addRastr(final RandomAccessContent randomAccessContent) {
- this.randomAccessContentList.add(randomAccessContent);
- }
-
- int getInstrsSize() {
- return this.inputStreamList.size();
- }
-
- public Object removeInstr(final int pos) {
- return this.inputStreamList.remove(pos);
- }
-
- public void removeInstr(final InputStream inputStream) {
- this.inputStreamList.remove(inputStream);
- }
-
- public Object removeRastr(final int pos) {
- return this.randomAccessContentList.remove(pos);
- }
-
- public void removeRastr(final RandomAccessContent randomAccessContent) {
- this.randomAccessContentList.remove(randomAccessContent);
- }
-
- public boolean hasStreams() {
- return inputStreamList.size() > 0 || outputStream != null || randomAccessContentList.size() > 0;
- }
-
- public void closeOutstr() throws FileSystemException {
- outputStream.close();
- outputStream = null;
- }
-
- int getRastrsSize() {
- return randomAccessContentList.size();
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.vfs2.provider;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+
+import org.apache.commons.vfs2.FileSystemException;
+import org.apache.commons.vfs2.RandomAccessContent;
+
+/**
+ * Holds the data which needs to be local to the current thread
+ */
+class FileContentThreadData {
+
+ // private int state = DefaultFileContent.STATE_CLOSED;
+
+ private final ArrayList<InputStream> inputStreamList = new ArrayList<>();
+ private final ArrayList<RandomAccessContent> randomAccessContentList = new ArrayList<>();
+ private DefaultFileContent.FileContentOutputStream outputStream;
+
+ FileContentThreadData() {
+ }
+
+ /*
+ * int getState() { return state; }
+ *
+ * void setState(int state) { this.state = state; }
+ */
+
+ void addInstr(final InputStream inputStream) {
+ this.inputStreamList.add(inputStream);
+ }
+
+ void setOutstr(final DefaultFileContent.FileContentOutputStream outputStream) {
+ this.outputStream = outputStream;
+ }
+
+ DefaultFileContent.FileContentOutputStream getOutstr() {
+ return this.outputStream;
+ }
+
+ void addRastr(final RandomAccessContent randomAccessContent) {
+ this.randomAccessContentList.add(randomAccessContent);
+ }
+
+ int getInstrsSize() {
+ return this.inputStreamList.size();
+ }
+
+ public Object removeInstr(final int pos) {
+ return this.inputStreamList.remove(pos);
+ }
+
+ InputStream removeInputStream(final int pos) {
+ return this.inputStreamList.remove(pos);
+ }
+
+ public void removeInstr(final InputStream inputStream) {
+ this.inputStreamList.remove(inputStream);
+ }
+
+ public Object removeRastr(final int pos) {
+ return this.randomAccessContentList.remove(pos);
+ }
+
+ public void removeRastr(final RandomAccessContent randomAccessContent) {
+ this.randomAccessContentList.remove(randomAccessContent);
+ }
+
+ public boolean hasStreams() {
+ return inputStreamList.size() > 0 || outputStream != null || randomAccessContentList.size() > 0;
+ }
+
+ public void closeOutstr() throws FileSystemException {
+ outputStream.close();
+ outputStream = null;
+ }
+
+ int getRastrsSize() {
+ return randomAccessContentList.size();
+ }
+}
diff --git a/commons-vfs2/src/main/java/org/apache/commons/vfs2/util/RawMonitorInputStream.java b/commons-vfs2/src/main/java/org/apache/commons/vfs2/util/RawMonitorInputStream.java
new file mode 100644
index 0000000..42b682a
--- /dev/null
+++ b/commons-vfs2/src/main/java/org/apache/commons/vfs2/util/RawMonitorInputStream.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.vfs2.util;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An InputStream that provides end-of-stream monitoring.
+ * <p>
+ * This is the same as {@link MonitorInputStream} but without the buffering.
+ * </p>
+ *
+ * @since 2.5
+ */
+public class RawMonitorInputStream extends FilterInputStream {
+
+ private static final int EOF_CHAR = -1;
+ private final AtomicBoolean finished = new AtomicBoolean(false);
+ private final AtomicLong atomicCount = new AtomicLong(0);
+
+// @Override
+// public synchronized void reset() throws IOException {
+// if (!finished.get()) {
+// super.reset();
+// }
+// }
+//
+// @Override
+// public synchronized long skip(long n) throws IOException {
+// if (finished.get()) {
+// return 0;
+// }
+// return super.skip(n);
+// }
+
+ /**
+ * Constructs a MonitorInputStream from the passed InputStream
+ *
+ * @param inputStream The input stream to wrap.
+ */
+ public RawMonitorInputStream(final InputStream inputStream) {
+ super(inputStream);
+ }
+
+ /**
+ * Returns 0 if the stream is at EOF, else the underlying inputStream will be queried.
+ *
+ * @return The number of bytes that are available.
+ * @throws IOException if an error occurs.
+ */
+ @Override
+ public synchronized int available() throws IOException {
+ if (finished.get()) {
+ return 0;
+ }
+
+ return super.available();
+ }
+
+ /**
+ * Reads a character.
+ *
+ * @return The character that was read as an integer.
+ * @throws IOException if an error occurs.
+ */
+ @Override
+ public int read() throws IOException { // lgtm [java/non-sync-override]
+ if (finished.get()) {
+ return EOF_CHAR;
+ }
+
+ final int ch = super.read();
+ if (ch != EOF_CHAR) {
+ atomicCount.incrementAndGet();
+ }
+
+ return ch;
+ }
+
+ /**
+ * Reads bytes from this input stream.
+ *
+ * @param buffer A byte array in which to place the characters read.
+ * @param offset The offset at which to start reading.
+ * @param length The maximum number of bytes to read.
+ * @return The number of bytes read.
+ * @throws IOException if an error occurs.
+ */
+ @Override
+ public int read(final byte[] buffer, final int offset, final int length) throws IOException { // lgtm [java/non-sync-override]
+ if (finished.get()) {
+ return EOF_CHAR;
+ }
+
+ final int nread = super.read(buffer, offset, length);
+ if (nread != EOF_CHAR) {
+ atomicCount.addAndGet(nread);
+ }
+ return nread;
+ }
+
+ /**
+ * Closes this input stream and releases any system resources associated with the stream.
+ *
+ * @throws IOException if an error occurs.
+ */
+ @Override
+ public void close() throws IOException {
+ final boolean closed = finished.getAndSet(true);
+ if (closed) {
+ return;
+ }
+
+ // Close the stream
+ IOException exc = null;
+ try {
+ super.close();
+ } catch (final IOException ioe) {
+ exc = ioe;
+ }
+
+ // Notify that the stream has been closed
+ try {
+ onClose();
+ } catch (final IOException ioe) {
+ exc = ioe;
+ }
+
+ if (exc != null) {
+ throw exc;
+ }
+ }
+
+ /**
+ * Called after the stream has been closed. This implementation does nothing.
+ *
+ * @throws IOException if an error occurs.
+ */
+ protected void onClose() throws IOException {
+ // noop
+ }
+
+ /**
+ * Gets the number of bytes read by this input stream.
+ *
+ * @return The number of bytes read by this input stream.
+ */
+ public long getCount() {
+ return atomicCount.get();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ // TODO Auto-generated method stub
+ super.mark(readlimit);
+ }
+}
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 5cfa369..91d6614 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -53,6 +53,9 @@ The <action> type attribute can be add,update,fix,remove.
<action issue="VFS-726" dev="ggregory" type="fix" due-to="Cornelius Höfig, Gary Gregory">
getInputStream(int bufferSize) on SftpFileObject effectively ignores buffer size.
</action>
+ <action issue="VFS-704" dev="ggregory" type="fix" due-to="Boris Petrov, Gary Gregory">
+ Some providers wrap their input/output streams twice in a BufferedInputStream.
+ </action>
</release>
<release version="2.4.1" date="2019-08-10" description="Bug fix release.">
<action issue="VFS-725" dev="ggregory" type="fix" due-to="Gary Gregory">