You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by jo...@apache.org on 2016/06/30 09:04:21 UTC
svn commit: r1750760 - in /commons/proper/io/trunk/src: changes/
main/java/org/apache/commons/io/input/ test/java/org/apache/commons/io/input/
Author: jochen
Date: Thu Jun 30 09:04:21 2016
New Revision: 1750760
URL: http://svn.apache.org/viewvc?rev=1750760&view=rev
Log:
Added the ObservableInputStream, and the MessageDigestCalculatingInputStream.
Added:
commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java (with props)
commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java (with props)
commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java (with props)
commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java (with props)
Modified:
commons/proper/io/trunk/src/changes/changes.xml
commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java
Modified: commons/proper/io/trunk/src/changes/changes.xml
URL: http://svn.apache.org/viewvc/commons/proper/io/trunk/src/changes/changes.xml?rev=1750760&r1=1750759&r2=1750760&view=diff
==============================================================================
--- commons/proper/io/trunk/src/changes/changes.xml (original)
+++ commons/proper/io/trunk/src/changes/changes.xml Thu Jun 30 09:04:21 2016
@@ -46,6 +46,11 @@ The <action> type attribute can be add,u
<body>
<!-- The release date is the date RC is cut -->
+ <release version="2.7" date="Not yet published">
+ <action dev="jochen" type="add">
+ Added the ObservableInputStream, and the MessageDigestCalculatingInputStream.
+ </action>
+ </release>
<release version="2.6" date="2016-MM-DD" description="New features and bug fixes.">
<action issue="IO-511" dev="britter" type="fix" due-to="Ahmet Celik">
After a few unit tests, a few newly created directories not cleaned completely.
Added: commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
URL: http://svn.apache.org/viewvc/commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java?rev=1750760&view=auto
==============================================================================
--- commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java (added)
+++ commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java Thu Jun 30 09:04:21 2016
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+
+/**
+ * This class is an example for using an {@link ObservableInputStream}. It
+ * creates its own {@link Observer}, which calculates a checksum using a
+ * MessageDigest, for example an MD5 sum.
+ * {@em Note}: Neither {@link ObservableInputStream}, nor {@link MessageDigest},
+ * are thread safe. So is {@link MessageDigestCalculatingInputStream}.
+ */
+public class MessageDigestCalculatingInputStream extends ObservableInputStream {
+ public static class MessageDigestMaintainingObserver extends Observer {
+ private final MessageDigest md;
+
+ public MessageDigestMaintainingObserver(MessageDigest pMd) {
+ md = pMd;
+ }
+
+ @Override
+ void data(int pByte) throws IOException {
+ md.update((byte) pByte);
+ }
+
+ @Override
+ void data(byte[] pBuffer, int pOffset, int pLength) throws IOException {
+ md.update(pBuffer, pOffset, pLength);
+ }
+ }
+
+ private final MessageDigest messageDigest;
+
+ /** Creates a new instance, which calculates a signature on the given stream,
+ * using the given {@link MessageDigest}.
+ */
+ public MessageDigestCalculatingInputStream(InputStream pStream, MessageDigest pDigest) {
+ super(pStream);
+ messageDigest = pDigest;
+ add(new MessageDigestMaintainingObserver(pDigest));
+ }
+ /** Creates a new instance, which calculates a signature on the given stream,
+ * using a {@link MessageDigest} with the given algorithm.
+ */
+ public MessageDigestCalculatingInputStream(InputStream pStream, String pAlgorithm) throws NoSuchAlgorithmException {
+ this(pStream, MessageDigest.getInstance(pAlgorithm));
+ }
+ /** Creates a new instance, which calculates a signature on the given stream,
+ * using a {@link MessageDigest} with the "MD5" algorithm.
+ */
+ public MessageDigestCalculatingInputStream(InputStream pStream) throws NoSuchAlgorithmException {
+ this(pStream, MessageDigest.getInstance("MD5"));
+ }
+
+ /** Returns the {@link MessageDigest}, which is being used for generating the
+ * checksum.
+ * {@em Note}: The checksum will only reflect the data, which has been read so far.
+ * This is probably not, what you expect. Make sure, that the complete data has been
+ * read, if that is what you want. The easiest way to do so is by invoking
+ * {@link #consume()}.
+ */
+ public MessageDigest getMessageDigest() {
+ return messageDigest;
+ }
+}
Propchange: commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
URL: http://svn.apache.org/viewvc/commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java?rev=1750760&view=auto
==============================================================================
--- commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java (added)
+++ commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java Thu Jun 30 09:04:21 2016
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * The {@link ObservableInputStream} allows, that an InputStream may be consumed
+ * by other receivers, apart from the thread, which is reading it.
+ * The other consumers are implemented as instances of {@link Observer}. A
+ * typical application may be the generation of a {@link MessageDigest} on the
+ * fly.
+ * {@code Note}: The {@link ObservableInputStream} is <em>not</em> thread safe,
+ * as instances of InputStream usually aren't.
+ * If you must access the stream from multiple threads, then synchronization, locking,
+ * or a similar means must be used.
+ * @see MessageDigestCalculatingInputStream
+ */
+public class ObservableInputStream extends ProxyInputStream {
+ public static abstract class Observer {
+ /** Called to indicate, that {@link InputStream#read()} has been invoked
+ * on the {@link ObservableInputStream}, and will return a value.
+ * @param pByte The value, which is being returned. This will never be -1 (EOF),
+ * because, in that case, {link #finished()} will be invoked instead.
+ */
+ void data(int pByte) throws IOException {}
+ /** Called to indicate, that {@link InputStream#read(byte[])}, or
+ * {@link InputStream#read(byte[], int, int)} have been called, and are about to
+ * invoke data.
+ * @param pBuffer The byte array, which has been passed to the read call, and where
+ * data has been stored.
+ * @param pOffset The offset within the byte array, where data has been stored.
+ * @param pLength The number of bytes, which have been stored in the byte array.
+ */
+ void data(byte[] pBuffer, int pOffset, int pLength) throws IOException {}
+ /** Called to indicate, that EOF has been seen on the underlying stream.
+ * This method may be called multiple times, if the reader keeps invoking
+ * either of the read methods, and they will consequently keep returning
+ * EOF.
+ */
+ void finished() throws IOException {}
+ /** Called to indicate, that the {@link ObservableInputStream} has been closed.
+ */
+ void closed() throws IOException {}
+ /**
+ * Called to indicate, that an error occurred on the underlying stream.
+ */
+ void error(IOException pException) throws IOException { throw pException; }
+ }
+
+ private final List<Observer> observers = new ArrayList<Observer>();
+
+ public ObservableInputStream(InputStream pProxy) {
+ super(pProxy);
+ }
+
+ public void add(Observer pObserver) {
+ observers.add(pObserver);
+ }
+
+ public void remove(Observer pObserver) {
+ observers.remove(pObserver);
+ }
+
+ public void removeAllObservers() {
+ observers.clear();
+ }
+
+ @Override
+ public int read() throws IOException {
+ int result = 0;
+ IOException ioe = null;
+ try {
+ result = super.read();
+ } catch (IOException pException) {
+ ioe = pException;
+ }
+ if (ioe != null) {
+ noteError(ioe);
+ } else if (result == -1) {
+ noteFinished();
+ } else {
+ noteDataByte(result);
+ }
+ return result;
+ }
+
+ @Override
+ public int read(byte[] pBuffer) throws IOException {
+ int result = 0;
+ IOException ioe = null;
+ try {
+ result = super.read(pBuffer);
+ } catch (IOException pException) {
+ ioe = pException;
+ }
+ if (ioe != null) {
+ noteError(ioe);
+ } else if (result == -1) {
+ noteFinished();
+ } else if (result > 0) {
+ noteDataBytes(pBuffer, 0, result);
+ }
+ return result;
+ }
+
+ @Override
+ public int read(byte[] pBuffer, int pOffset, int pLength) throws IOException {
+ int result = 0;
+ IOException ioe = null;
+ try {
+ result = super.read(pBuffer, pOffset, pLength);
+ } catch (IOException pException) {
+ ioe = pException;
+ }
+ if (ioe != null) {
+ noteError(ioe);
+ } else if (result == -1) {
+ noteFinished();
+ } else if (result > 0) {
+ noteDataBytes(pBuffer, pOffset, result);
+ }
+ return result;
+ }
+
+ /** Notifies the observers by invoking {@link Observer#data(byte[],int,int)}
+ * with the given arguments.
+ * @param pBuffer Passed to the observers.
+ * @param pOffset Passed to the observers.
+ * @param pLength Passed to the observers.
+ * @throws IOException Some observer has thrown an exception, which is being
+ * passed down.
+ */
+ protected void noteDataBytes(byte[] pBuffer, int pOffset, int pLength) throws IOException {
+ for (Observer observer : getObservers()) {
+ observer.data(pBuffer, pOffset, pLength);
+ }
+ }
+
+ /** Notifies the observers by invoking {@link Observer#finished()}.
+ * @throws IOException Some observer has thrown an exception, which is being
+ * passed down.
+ */
+ protected void noteFinished() throws IOException {
+ for (Observer observer : getObservers()) {
+ observer.finished();
+ }
+ }
+
+ /** Notifies the observers by invoking {@link Observer#data(int)}
+ * with the given arguments.
+ * @param pDataByte Passed to the observers.
+ * @throws IOException Some observer has thrown an exception, which is being
+ * passed down.
+ */
+ protected void noteDataByte(int pDataByte) throws IOException {
+ for (Observer observer : getObservers()) {
+ observer.data(pDataByte);
+ }
+ }
+
+ /** Notifies the observers by invoking {@link Observer#error(IOException)}
+ * with the given argument.
+ * @param pException Passed to the observers.
+ * @throws IOException Some observer has thrown an exception, which is being
+ * passed down. This may be the same exception, which has been passed as an
+ * argument.
+ */
+ protected void noteError(IOException pException) throws IOException {
+ for (Observer observer : getObservers()) {
+ observer.error(pException);
+ }
+ }
+
+ /** Notifies the observers by invoking {@link Observer#finished()}.
+ * @throws IOException Some observer has thrown an exception, which is being
+ * passed down.
+ */
+ protected void noteClosed() throws IOException {
+ for (Observer observer : getObservers()) {
+ observer.closed();
+ }
+ }
+
+ protected List<Observer> getObservers() {
+ return observers;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOException ioe = null;
+ try {
+ super.close();
+ } catch (IOException e) {
+ ioe = e;
+ }
+ if (ioe == null) {
+ noteClosed();
+ } else {
+ noteError(ioe);
+ }
+ }
+
+ /** Reads all data from the underlying {@link InputStream}, while notifying the
+ * observers.
+ * @throws IOException The underlying {@link InputStream}, or either of the
+ * observers has thrown an exception.
+ */
+ public void consume() throws IOException {
+ final byte[] buffer = new byte[8192];
+ for (;;) {
+ final int res = read(buffer);
+ if (res == -1) {
+ return;
+ }
+ }
+ }
+
+}
Propchange: commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java
URL: http://svn.apache.org/viewvc/commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java?rev=1750760&r1=1750759&r2=1750760&view=diff
==============================================================================
--- commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java (original)
+++ commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java Thu Jun 30 09:04:21 2016
@@ -35,6 +35,7 @@ import java.io.OutputStream;
*
* @version $Id$
* @since 1.4
+ * @see ObservableInputStream
*/
public class TeeInputStream extends ProxyInputStream {
Added: commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
URL: http://svn.apache.org/viewvc/commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java?rev=1750760&view=auto
==============================================================================
--- commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java (added)
+++ commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java Thu Jun 30 09:04:21 2016
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.security.MessageDigest;
+import java.util.Random;
+
+import org.junit.Test;
+
+public class MessageDigestCalculatingInputStreamTest {
+ public static byte[] generateRandomByteStream(int pSize) {
+ final byte[] buffer = new byte[pSize];
+ final Random rnd = new Random();
+ rnd.nextBytes(buffer);
+ return buffer;
+ }
+
+ @Test
+ public void test() throws Exception {
+ for (int i = 256; i < 8192; i = i*2) {
+ final byte[] buffer = generateRandomByteStream(i);
+ final MessageDigest md5Sum = MessageDigest.getInstance("MD5");
+ final byte[] expect = md5Sum.digest(buffer);
+ final MessageDigestCalculatingInputStream md5InputStream = new MessageDigestCalculatingInputStream(new ByteArrayInputStream(buffer));
+ md5InputStream.consume();
+ final byte[] got = md5InputStream.getMessageDigest().digest();
+ assertArrayEquals(expect, got);
+ }
+ }
+
+}
Propchange: commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
URL: http://svn.apache.org/viewvc/commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java?rev=1750760&view=auto
==============================================================================
--- commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java (added)
+++ commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java Thu Jun 30 09:04:21 2016
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import org.apache.commons.io.input.ObservableInputStream;
+import org.apache.commons.io.input.ObservableInputStream.Observer;
+import org.junit.Test;
+
+public class ObservableInputStreamTest {
+ private static class LastByteKeepingObserver extends Observer {
+ private int lastByteSeen = -1;
+ private boolean finished;
+ private boolean closed;
+
+ @Override
+ void data(int pByte) throws IOException {
+ super.data(pByte);
+ lastByteSeen = pByte;
+ }
+
+ @Override
+ void finished() throws IOException {
+ super.finished();
+ finished = true;
+ }
+
+ @Override
+ void closed() throws IOException {
+ super.closed();
+ closed = true;
+ }
+ }
+ private static class LastBytesKeepingObserver extends Observer {
+ private byte[] buffer = null;
+ private int offset = -1;
+ private int length = -1;
+
+ @Override
+ void data(byte[] pBuffer, int pOffset, int pLength) throws IOException {
+ super.data(pBuffer, pOffset, pLength);
+ buffer = pBuffer;
+ offset = pOffset;
+ length = pLength;
+ }
+ }
+
+ /** Tests, that {@link Observer#data(int)} is called.
+ */
+ @Test
+ public void testDataByteCalled() throws Exception {
+ final byte[] buffer = MessageDigestCalculatingInputStreamTest.generateRandomByteStream(4096);
+ final ObservableInputStream ois = new ObservableInputStream(new ByteArrayInputStream(buffer));
+ final LastByteKeepingObserver lko = new LastByteKeepingObserver();
+ assertEquals(-1, lko.lastByteSeen);
+ ois.read();
+ assertEquals(-1, lko.lastByteSeen);
+ assertFalse(lko.finished);
+ assertFalse(lko.closed);
+ ois.add(lko);
+ for (int i = 1; i < buffer.length; i++) {
+ final int result = ois.read();
+ assertEquals((byte) result, buffer[i]);
+ assertEquals(result, lko.lastByteSeen);
+ assertFalse(lko.finished);
+ assertFalse(lko.closed);
+ }
+ final int result = ois.read();
+ assertEquals(-1, result);
+ assertTrue(lko.finished);
+ assertFalse(lko.closed);
+ ois.close();
+ assertTrue(lko.finished);
+ assertTrue(lko.closed);
+ }
+
+ /** Tests, that {@link Observer#data(byte[],int,int)} is called.
+ */
+ @Test
+ public void testDataBytesCalled() throws Exception {
+ final byte[] buffer = MessageDigestCalculatingInputStreamTest.generateRandomByteStream(4096);
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
+ final ObservableInputStream ois = new ObservableInputStream(bais);
+ final LastBytesKeepingObserver lko = new LastBytesKeepingObserver();
+ final byte[] readBuffer = new byte[23];
+ assertEquals(null, lko.buffer);
+ ois.read(readBuffer);
+ assertEquals(null, lko.buffer);
+ ois.add(lko);
+ for (;;) {
+ if (bais.available() >= 2048) {
+ final int result = ois.read(readBuffer);
+ if (result == -1) {
+ ois.close();
+ break;
+ } else {
+ assertEquals(readBuffer, lko.buffer);
+ assertEquals(0, lko.offset);
+ assertEquals(readBuffer.length, lko.length);
+ }
+ } else {
+ final int res = Math.min(11, bais.available());
+ final int result = ois.read(readBuffer, 1, 11);
+ if (result == -1) {
+ ois.close();
+ break;
+ } else {
+ assertEquals(readBuffer, lko.buffer);
+ assertEquals(1, lko.offset);
+ assertEquals(res, lko.length);
+ }
+ }
+ }
+ }
+
+}
Propchange: commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Re: svn commit: r1750760 - in /commons/proper/io/trunk/src: changes/
main/java/org/apache/commons/io/input/ test/java/org/apache/commons/io/input/
Posted by Benedikt Ritter <br...@apache.org>.
Hi Jochen,
wouldn't it be good to have a Jira issue for this change?
Regards,
Benedikt
<jo...@apache.org> schrieb am Do., 30. Juni 2016 um 11:04:
> Author: jochen
> Date: Thu Jun 30 09:04:21 2016
> New Revision: 1750760
>
> URL: http://svn.apache.org/viewvc?rev=1750760&view=rev
> Log:
> Added the ObservableInputStream, and the
> MessageDigestCalculatingInputStream.
>
> Added:
>
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
> (with props)
>
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
> (with props)
>
> commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
> (with props)
>
> commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
> (with props)
> Modified:
> commons/proper/io/trunk/src/changes/changes.xml
>
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java
>
> Modified: commons/proper/io/trunk/src/changes/changes.xml
> URL:
> http://svn.apache.org/viewvc/commons/proper/io/trunk/src/changes/changes.xml?rev=1750760&r1=1750759&r2=1750760&view=diff
>
> ==============================================================================
> --- commons/proper/io/trunk/src/changes/changes.xml (original)
> +++ commons/proper/io/trunk/src/changes/changes.xml Thu Jun 30 09:04:21
> 2016
> @@ -46,6 +46,11 @@ The <action> type attribute can be add,u
>
> <body>
> <!-- The release date is the date RC is cut -->
> + <release version="2.7" date="Not yet published">
> + <action dev="jochen" type="add">
> + Added the ObservableInputStream, and the
> MessageDigestCalculatingInputStream.
> + </action>
> + </release>
> <release version="2.6" date="2016-MM-DD" description="New features
> and bug fixes.">
> <action issue="IO-511" dev="britter" type="fix" due-to="Ahmet
> Celik">
> After a few unit tests, a few newly created directories not
> cleaned completely.
>
> Added:
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
> URL:
> http://svn.apache.org/viewvc/commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java?rev=1750760&view=auto
>
> ==============================================================================
> ---
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
> (added)
> +++
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
> Thu Jun 30 09:04:21 2016
> @@ -0,0 +1,84 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.commons.io.input;
> +
> +import java.io.IOException;
> +import java.io.InputStream;
> +import java.security.MessageDigest;
> +import java.security.NoSuchAlgorithmException;
> +
> +
> +/**
> + * This class is an example for using an {@link ObservableInputStream}. It
> + * creates its own {@link Observer}, which calculates a checksum using a
> + * MessageDigest, for example an MD5 sum.
> + * {@em Note}: Neither {@link ObservableInputStream}, nor {@link
> MessageDigest},
> + * are thread safe. So is {@link MessageDigestCalculatingInputStream}.
> + */
> +public class MessageDigestCalculatingInputStream extends
> ObservableInputStream {
> + public static class MessageDigestMaintainingObserver extends Observer
> {
> + private final MessageDigest md;
> +
> + public MessageDigestMaintainingObserver(MessageDigest pMd) {
> + md = pMd;
> + }
> +
> + @Override
> + void data(int pByte) throws IOException {
> + md.update((byte) pByte);
> + }
> +
> + @Override
> + void data(byte[] pBuffer, int pOffset, int pLength) throws
> IOException {
> + md.update(pBuffer, pOffset, pLength);
> + }
> + }
> +
> + private final MessageDigest messageDigest;
> +
> + /** Creates a new instance, which calculates a signature on the given
> stream,
> + * using the given {@link MessageDigest}.
> + */
> + public MessageDigestCalculatingInputStream(InputStream pStream,
> MessageDigest pDigest) {
> + super(pStream);
> + messageDigest = pDigest;
> + add(new MessageDigestMaintainingObserver(pDigest));
> + }
> + /** Creates a new instance, which calculates a signature on the given
> stream,
> + * using a {@link MessageDigest} with the given algorithm.
> + */
> + public MessageDigestCalculatingInputStream(InputStream pStream,
> String pAlgorithm) throws NoSuchAlgorithmException {
> + this(pStream, MessageDigest.getInstance(pAlgorithm));
> + }
> + /** Creates a new instance, which calculates a signature on the given
> stream,
> + * using a {@link MessageDigest} with the "MD5" algorithm.
> + */
> + public MessageDigestCalculatingInputStream(InputStream pStream)
> throws NoSuchAlgorithmException {
> + this(pStream, MessageDigest.getInstance("MD5"));
> + }
> +
> + /** Returns the {@link MessageDigest}, which is being used for
> generating the
> + * checksum.
> + * {@em Note}: The checksum will only reflect the data, which has
> been read so far.
> + * This is probably not, what you expect. Make sure, that the
> complete data has been
> + * read, if that is what you want. The easiest way to do so is by
> invoking
> + * {@link #consume()}.
> + */
> + public MessageDigest getMessageDigest() {
> + return messageDigest;
> + }
> +}
>
> Propchange:
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
>
> ------------------------------------------------------------------------------
> svn:mime-type = text/plain
>
> Added:
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
> URL:
> http://svn.apache.org/viewvc/commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java?rev=1750760&view=auto
>
> ==============================================================================
> ---
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
> (added)
> +++
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
> Thu Jun 30 09:04:21 2016
> @@ -0,0 +1,238 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.commons.io.input;
> +
> +import java.io.IOException;
> +import java.io.InputStream;
> +import java.security.MessageDigest;
> +import java.util.ArrayList;
> +import java.util.List;
> +
> +
> +/**
> + * The {@link ObservableInputStream} allows, that an InputStream may be
> consumed
> + * by other receivers, apart from the thread, which is reading it.
> + * The other consumers are implemented as instances of {@link Observer}. A
> + * typical application may be the generation of a {@link MessageDigest}
> on the
> + * fly.
> + * {@code Note}: The {@link ObservableInputStream} is <em>not</em> thread
> safe,
> + * as instances of InputStream usually aren't.
> + * If you must access the stream from multiple threads, then
> synchronization, locking,
> + * or a similar means must be used.
> + * @see MessageDigestCalculatingInputStream
> + */
> +public class ObservableInputStream extends ProxyInputStream {
> + public static abstract class Observer {
> + /** Called to indicate, that {@link InputStream#read()} has been
> invoked
> + * on the {@link ObservableInputStream}, and will return a value.
> + * @param pByte The value, which is being returned. This will
> never be -1 (EOF),
> + * because, in that case, {link #finished()} will be invoked
> instead.
> + */
> + void data(int pByte) throws IOException {}
> + /** Called to indicate, that {@link InputStream#read(byte[])}, or
> + * {@link InputStream#read(byte[], int, int)} have been called,
> and are about to
> + * invoke data.
> + * @param pBuffer The byte array, which has been passed to the
> read call, and where
> + * data has been stored.
> + * @param pOffset The offset within the byte array, where data
> has been stored.
> + * @param pLength The number of bytes, which have been stored in
> the byte array.
> + */
> + void data(byte[] pBuffer, int pOffset, int pLength) throws
> IOException {}
> + /** Called to indicate, that EOF has been seen on the underlying
> stream.
> + * This method may be called multiple times, if the reader keeps
> invoking
> + * either of the read methods, and they will consequently keep
> returning
> + * EOF.
> + */
> + void finished() throws IOException {}
> + /** Called to indicate, that the {@link ObservableInputStream}
> has been closed.
> + */
> + void closed() throws IOException {}
> + /**
> + * Called to indicate, that an error occurred on the underlying
> stream.
> + */
> + void error(IOException pException) throws IOException { throw
> pException; }
> + }
> +
> + private final List<Observer> observers = new ArrayList<Observer>();
> +
> + public ObservableInputStream(InputStream pProxy) {
> + super(pProxy);
> + }
> +
> + public void add(Observer pObserver) {
> + observers.add(pObserver);
> + }
> +
> + public void remove(Observer pObserver) {
> + observers.remove(pObserver);
> + }
> +
> + public void removeAllObservers() {
> + observers.clear();
> + }
> +
> + @Override
> + public int read() throws IOException {
> + int result = 0;
> + IOException ioe = null;
> + try {
> + result = super.read();
> + } catch (IOException pException) {
> + ioe = pException;
> + }
> + if (ioe != null) {
> + noteError(ioe);
> + } else if (result == -1) {
> + noteFinished();
> + } else {
> + noteDataByte(result);
> + }
> + return result;
> + }
> +
> + @Override
> + public int read(byte[] pBuffer) throws IOException {
> + int result = 0;
> + IOException ioe = null;
> + try {
> + result = super.read(pBuffer);
> + } catch (IOException pException) {
> + ioe = pException;
> + }
> + if (ioe != null) {
> + noteError(ioe);
> + } else if (result == -1) {
> + noteFinished();
> + } else if (result > 0) {
> + noteDataBytes(pBuffer, 0, result);
> + }
> + return result;
> + }
> +
> + @Override
> + public int read(byte[] pBuffer, int pOffset, int pLength) throws
> IOException {
> + int result = 0;
> + IOException ioe = null;
> + try {
> + result = super.read(pBuffer, pOffset, pLength);
> + } catch (IOException pException) {
> + ioe = pException;
> + }
> + if (ioe != null) {
> + noteError(ioe);
> + } else if (result == -1) {
> + noteFinished();
> + } else if (result > 0) {
> + noteDataBytes(pBuffer, pOffset, result);
> + }
> + return result;
> + }
> +
> + /** Notifies the observers by invoking {@link
> Observer#data(byte[],int,int)}
> + * with the given arguments.
> + * @param pBuffer Passed to the observers.
> + * @param pOffset Passed to the observers.
> + * @param pLength Passed to the observers.
> + * @throws IOException Some observer has thrown an exception, which
> is being
> + * passed down.
> + */
> + protected void noteDataBytes(byte[] pBuffer, int pOffset, int
> pLength) throws IOException {
> + for (Observer observer : getObservers()) {
> + observer.data(pBuffer, pOffset, pLength);
> + }
> + }
> +
> + /** Notifies the observers by invoking {@link Observer#finished()}.
> + * @throws IOException Some observer has thrown an exception, which
> is being
> + * passed down.
> + */
> + protected void noteFinished() throws IOException {
> + for (Observer observer : getObservers()) {
> + observer.finished();
> + }
> + }
> +
> + /** Notifies the observers by invoking {@link Observer#data(int)}
> + * with the given arguments.
> + * @param pDataByte Passed to the observers.
> + * @throws IOException Some observer has thrown an exception, which
> is being
> + * passed down.
> + */
> + protected void noteDataByte(int pDataByte) throws IOException {
> + for (Observer observer : getObservers()) {
> + observer.data(pDataByte);
> + }
> + }
> +
> + /** Notifies the observers by invoking {@link
> Observer#error(IOException)}
> + * with the given argument.
> + * @param pException Passed to the observers.
> + * @throws IOException Some observer has thrown an exception, which
> is being
> + * passed down. This may be the same exception, which has been
> passed as an
> + * argument.
> + */
> + protected void noteError(IOException pException) throws IOException {
> + for (Observer observer : getObservers()) {
> + observer.error(pException);
> + }
> + }
> +
> + /** Notifies the observers by invoking {@link Observer#finished()}.
> + * @throws IOException Some observer has thrown an exception, which
> is being
> + * passed down.
> + */
> + protected void noteClosed() throws IOException {
> + for (Observer observer : getObservers()) {
> + observer.closed();
> + }
> + }
> +
> + protected List<Observer> getObservers() {
> + return observers;
> + }
> +
> + @Override
> + public void close() throws IOException {
> + IOException ioe = null;
> + try {
> + super.close();
> + } catch (IOException e) {
> + ioe = e;
> + }
> + if (ioe == null) {
> + noteClosed();
> + } else {
> + noteError(ioe);
> + }
> + }
> +
> + /** Reads all data from the underlying {@link InputStream}, while
> notifying the
> + * observers.
> + * @throws IOException The underlying {@link InputStream}, or either
> of the
> + * observers has thrown an exception.
> + */
> + public void consume() throws IOException {
> + final byte[] buffer = new byte[8192];
> + for (;;) {
> + final int res = read(buffer);
> + if (res == -1) {
> + return;
> + }
> + }
> + }
> +
> +}
>
> Propchange:
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
>
> ------------------------------------------------------------------------------
> svn:mime-type = text/plain
>
> Modified:
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java
> URL:
> http://svn.apache.org/viewvc/commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java?rev=1750760&r1=1750759&r2=1750760&view=diff
>
> ==============================================================================
> ---
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java
> (original)
> +++
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java
> Thu Jun 30 09:04:21 2016
> @@ -35,6 +35,7 @@ import java.io.OutputStream;
> *
> * @version $Id$
> * @since 1.4
> + * @see ObservableInputStream
> */
> public class TeeInputStream extends ProxyInputStream {
>
>
> Added:
> commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
> URL:
> http://svn.apache.org/viewvc/commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java?rev=1750760&view=auto
>
> ==============================================================================
> ---
> commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
> (added)
> +++
> commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
> Thu Jun 30 09:04:21 2016
> @@ -0,0 +1,48 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.commons.io.input;
> +
> +import static org.junit.Assert.*;
> +
> +import java.io.ByteArrayInputStream;
> +import java.security.MessageDigest;
> +import java.util.Random;
> +
> +import org.junit.Test;
> +
> +public class MessageDigestCalculatingInputStreamTest {
> + public static byte[] generateRandomByteStream(int pSize) {
> + final byte[] buffer = new byte[pSize];
> + final Random rnd = new Random();
> + rnd.nextBytes(buffer);
> + return buffer;
> + }
> +
> + @Test
> + public void test() throws Exception {
> + for (int i = 256; i < 8192; i = i*2) {
> + final byte[] buffer = generateRandomByteStream(i);
> + final MessageDigest md5Sum = MessageDigest.getInstance("MD5");
> + final byte[] expect = md5Sum.digest(buffer);
> + final MessageDigestCalculatingInputStream md5InputStream =
> new MessageDigestCalculatingInputStream(new ByteArrayInputStream(buffer));
> + md5InputStream.consume();
> + final byte[] got = md5InputStream.getMessageDigest().digest();
> + assertArrayEquals(expect, got);
> + }
> + }
> +
> +}
>
> Propchange:
> commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
>
> ------------------------------------------------------------------------------
> svn:mime-type = text/plain
>
> Added:
> commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
> URL:
> http://svn.apache.org/viewvc/commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java?rev=1750760&view=auto
>
> ==============================================================================
> ---
> commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
> (added)
> +++
> commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
> Thu Jun 30 09:04:21 2016
> @@ -0,0 +1,134 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.commons.io.input;
> +
> +import static org.junit.Assert.*;
> +
> +import java.io.ByteArrayInputStream;
> +import java.io.IOException;
> +
> +import org.apache.commons.io.input.ObservableInputStream;
> +import org.apache.commons.io.input.ObservableInputStream.Observer;
> +import org.junit.Test;
> +
> +public class ObservableInputStreamTest {
> + private static class LastByteKeepingObserver extends Observer {
> + private int lastByteSeen = -1;
> + private boolean finished;
> + private boolean closed;
> +
> + @Override
> + void data(int pByte) throws IOException {
> + super.data(pByte);
> + lastByteSeen = pByte;
> + }
> +
> + @Override
> + void finished() throws IOException {
> + super.finished();
> + finished = true;
> + }
> +
> + @Override
> + void closed() throws IOException {
> + super.closed();
> + closed = true;
> + }
> + }
> + private static class LastBytesKeepingObserver extends Observer {
> + private byte[] buffer = null;
> + private int offset = -1;
> + private int length = -1;
> +
> + @Override
> + void data(byte[] pBuffer, int pOffset, int pLength) throws
> IOException {
> + super.data(pBuffer, pOffset, pLength);
> + buffer = pBuffer;
> + offset = pOffset;
> + length = pLength;
> + }
> + }
> +
> + /** Tests, that {@link Observer#data(int)} is called.
> + */
> + @Test
> + public void testDataByteCalled() throws Exception {
> + final byte[] buffer =
> MessageDigestCalculatingInputStreamTest.generateRandomByteStream(4096);
> + final ObservableInputStream ois = new ObservableInputStream(new
> ByteArrayInputStream(buffer));
> + final LastByteKeepingObserver lko = new LastByteKeepingObserver();
> + assertEquals(-1, lko.lastByteSeen);
> + ois.read();
> + assertEquals(-1, lko.lastByteSeen);
> + assertFalse(lko.finished);
> + assertFalse(lko.closed);
> + ois.add(lko);
> + for (int i = 1; i < buffer.length; i++) {
> + final int result = ois.read();
> + assertEquals((byte) result, buffer[i]);
> + assertEquals(result, lko.lastByteSeen);
> + assertFalse(lko.finished);
> + assertFalse(lko.closed);
> + }
> + final int result = ois.read();
> + assertEquals(-1, result);
> + assertTrue(lko.finished);
> + assertFalse(lko.closed);
> + ois.close();
> + assertTrue(lko.finished);
> + assertTrue(lko.closed);
> + }
> +
> + /** Tests, that {@link Observer#data(byte[],int,int)} is called.
> + */
> + @Test
> + public void testDataBytesCalled() throws Exception {
> + final byte[] buffer =
> MessageDigestCalculatingInputStreamTest.generateRandomByteStream(4096);
> + ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
> + final ObservableInputStream ois = new ObservableInputStream(bais);
> + final LastBytesKeepingObserver lko = new
> LastBytesKeepingObserver();
> + final byte[] readBuffer = new byte[23];
> + assertEquals(null, lko.buffer);
> + ois.read(readBuffer);
> + assertEquals(null, lko.buffer);
> + ois.add(lko);
> + for (;;) {
> + if (bais.available() >= 2048) {
> + final int result = ois.read(readBuffer);
> + if (result == -1) {
> + ois.close();
> + break;
> + } else {
> + assertEquals(readBuffer, lko.buffer);
> + assertEquals(0, lko.offset);
> + assertEquals(readBuffer.length, lko.length);
> + }
> + } else {
> + final int res = Math.min(11, bais.available());
> + final int result = ois.read(readBuffer, 1, 11);
> + if (result == -1) {
> + ois.close();
> + break;
> + } else {
> + assertEquals(readBuffer, lko.buffer);
> + assertEquals(1, lko.offset);
> + assertEquals(res, lko.length);
> + }
> + }
> + }
> + }
> +
> +}
>
> Propchange:
> commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
>
> ------------------------------------------------------------------------------
> svn:mime-type = text/plain
>
>
>
Re: svn commit: r1750760 - in /commons/proper/io/trunk/src: changes/
main/java/org/apache/commons/io/input/ test/java/org/apache/commons/io/input/
Posted by Benedikt Ritter <br...@apache.org>.
Hi Jochen,
wouldn't it be good to have a Jira issue for this change?
Regards,
Benedikt
<jo...@apache.org> schrieb am Do., 30. Juni 2016 um 11:04:
> Author: jochen
> Date: Thu Jun 30 09:04:21 2016
> New Revision: 1750760
>
> URL: http://svn.apache.org/viewvc?rev=1750760&view=rev
> Log:
> Added the ObservableInputStream, and the
> MessageDigestCalculatingInputStream.
>
> Added:
>
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
> (with props)
>
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
> (with props)
>
> commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
> (with props)
>
> commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
> (with props)
> Modified:
> commons/proper/io/trunk/src/changes/changes.xml
>
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java
>
> Modified: commons/proper/io/trunk/src/changes/changes.xml
> URL:
> http://svn.apache.org/viewvc/commons/proper/io/trunk/src/changes/changes.xml?rev=1750760&r1=1750759&r2=1750760&view=diff
>
> ==============================================================================
> --- commons/proper/io/trunk/src/changes/changes.xml (original)
> +++ commons/proper/io/trunk/src/changes/changes.xml Thu Jun 30 09:04:21
> 2016
> @@ -46,6 +46,11 @@ The <action> type attribute can be add,u
>
> <body>
> <!-- The release date is the date RC is cut -->
> + <release version="2.7" date="Not yet published">
> + <action dev="jochen" type="add">
> + Added the ObservableInputStream, and the
> MessageDigestCalculatingInputStream.
> + </action>
> + </release>
> <release version="2.6" date="2016-MM-DD" description="New features
> and bug fixes.">
> <action issue="IO-511" dev="britter" type="fix" due-to="Ahmet
> Celik">
> After a few unit tests, a few newly created directories not
> cleaned completely.
>
> Added:
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
> URL:
> http://svn.apache.org/viewvc/commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java?rev=1750760&view=auto
>
> ==============================================================================
> ---
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
> (added)
> +++
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
> Thu Jun 30 09:04:21 2016
> @@ -0,0 +1,84 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.commons.io.input;
> +
> +import java.io.IOException;
> +import java.io.InputStream;
> +import java.security.MessageDigest;
> +import java.security.NoSuchAlgorithmException;
> +
> +
> +/**
> + * This class is an example for using an {@link ObservableInputStream}. It
> + * creates its own {@link Observer}, which calculates a checksum using a
> + * MessageDigest, for example an MD5 sum.
> + * {@em Note}: Neither {@link ObservableInputStream}, nor {@link
> MessageDigest},
> + * are thread safe. So is {@link MessageDigestCalculatingInputStream}.
> + */
> +public class MessageDigestCalculatingInputStream extends
> ObservableInputStream {
> + public static class MessageDigestMaintainingObserver extends Observer
> {
> + private final MessageDigest md;
> +
> + public MessageDigestMaintainingObserver(MessageDigest pMd) {
> + md = pMd;
> + }
> +
> + @Override
> + void data(int pByte) throws IOException {
> + md.update((byte) pByte);
> + }
> +
> + @Override
> + void data(byte[] pBuffer, int pOffset, int pLength) throws
> IOException {
> + md.update(pBuffer, pOffset, pLength);
> + }
> + }
> +
> + private final MessageDigest messageDigest;
> +
> + /** Creates a new instance, which calculates a signature on the given
> stream,
> + * using the given {@link MessageDigest}.
> + */
> + public MessageDigestCalculatingInputStream(InputStream pStream,
> MessageDigest pDigest) {
> + super(pStream);
> + messageDigest = pDigest;
> + add(new MessageDigestMaintainingObserver(pDigest));
> + }
> + /** Creates a new instance, which calculates a signature on the given
> stream,
> + * using a {@link MessageDigest} with the given algorithm.
> + */
> + public MessageDigestCalculatingInputStream(InputStream pStream,
> String pAlgorithm) throws NoSuchAlgorithmException {
> + this(pStream, MessageDigest.getInstance(pAlgorithm));
> + }
> + /** Creates a new instance, which calculates a signature on the given
> stream,
> + * using a {@link MessageDigest} with the "MD5" algorithm.
> + */
> + public MessageDigestCalculatingInputStream(InputStream pStream)
> throws NoSuchAlgorithmException {
> + this(pStream, MessageDigest.getInstance("MD5"));
> + }
> +
> + /** Returns the {@link MessageDigest}, which is being used for
> generating the
> + * checksum.
> + * {@em Note}: The checksum will only reflect the data, which has
> been read so far.
> + * This is probably not, what you expect. Make sure, that the
> complete data has been
> + * read, if that is what you want. The easiest way to do so is by
> invoking
> + * {@link #consume()}.
> + */
> + public MessageDigest getMessageDigest() {
> + return messageDigest;
> + }
> +}
>
> Propchange:
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
>
> ------------------------------------------------------------------------------
> svn:mime-type = text/plain
>
> Added:
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
> URL:
> http://svn.apache.org/viewvc/commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java?rev=1750760&view=auto
>
> ==============================================================================
> ---
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
> (added)
> +++
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
> Thu Jun 30 09:04:21 2016
> @@ -0,0 +1,238 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.commons.io.input;
> +
> +import java.io.IOException;
> +import java.io.InputStream;
> +import java.security.MessageDigest;
> +import java.util.ArrayList;
> +import java.util.List;
> +
> +
> +/**
> + * The {@link ObservableInputStream} allows, that an InputStream may be
> consumed
> + * by other receivers, apart from the thread, which is reading it.
> + * The other consumers are implemented as instances of {@link Observer}. A
> + * typical application may be the generation of a {@link MessageDigest}
> on the
> + * fly.
> + * {@code Note}: The {@link ObservableInputStream} is <em>not</em> thread
> safe,
> + * as instances of InputStream usually aren't.
> + * If you must access the stream from multiple threads, then
> synchronization, locking,
> + * or a similar means must be used.
> + * @see MessageDigestCalculatingInputStream
> + */
> +public class ObservableInputStream extends ProxyInputStream {
> + public static abstract class Observer {
> + /** Called to indicate, that {@link InputStream#read()} has been
> invoked
> + * on the {@link ObservableInputStream}, and will return a value.
> + * @param pByte The value, which is being returned. This will
> never be -1 (EOF),
> + * because, in that case, {link #finished()} will be invoked
> instead.
> + */
> + void data(int pByte) throws IOException {}
> + /** Called to indicate, that {@link InputStream#read(byte[])}, or
> + * {@link InputStream#read(byte[], int, int)} have been called,
> and are about to
> + * invoke data.
> + * @param pBuffer The byte array, which has been passed to the
> read call, and where
> + * data has been stored.
> + * @param pOffset The offset within the byte array, where data
> has been stored.
> + * @param pLength The number of bytes, which have been stored in
> the byte array.
> + */
> + void data(byte[] pBuffer, int pOffset, int pLength) throws
> IOException {}
> + /** Called to indicate, that EOF has been seen on the underlying
> stream.
> + * This method may be called multiple times, if the reader keeps
> invoking
> + * either of the read methods, and they will consequently keep
> returning
> + * EOF.
> + */
> + void finished() throws IOException {}
> + /** Called to indicate, that the {@link ObservableInputStream}
> has been closed.
> + */
> + void closed() throws IOException {}
> + /**
> + * Called to indicate, that an error occurred on the underlying
> stream.
> + */
> + void error(IOException pException) throws IOException { throw
> pException; }
> + }
> +
> + private final List<Observer> observers = new ArrayList<Observer>();
> +
> + public ObservableInputStream(InputStream pProxy) {
> + super(pProxy);
> + }
> +
> + public void add(Observer pObserver) {
> + observers.add(pObserver);
> + }
> +
> + public void remove(Observer pObserver) {
> + observers.remove(pObserver);
> + }
> +
> + public void removeAllObservers() {
> + observers.clear();
> + }
> +
> + @Override
> + public int read() throws IOException {
> + int result = 0;
> + IOException ioe = null;
> + try {
> + result = super.read();
> + } catch (IOException pException) {
> + ioe = pException;
> + }
> + if (ioe != null) {
> + noteError(ioe);
> + } else if (result == -1) {
> + noteFinished();
> + } else {
> + noteDataByte(result);
> + }
> + return result;
> + }
> +
> + @Override
> + public int read(byte[] pBuffer) throws IOException {
> + int result = 0;
> + IOException ioe = null;
> + try {
> + result = super.read(pBuffer);
> + } catch (IOException pException) {
> + ioe = pException;
> + }
> + if (ioe != null) {
> + noteError(ioe);
> + } else if (result == -1) {
> + noteFinished();
> + } else if (result > 0) {
> + noteDataBytes(pBuffer, 0, result);
> + }
> + return result;
> + }
> +
> + @Override
> + public int read(byte[] pBuffer, int pOffset, int pLength) throws
> IOException {
> + int result = 0;
> + IOException ioe = null;
> + try {
> + result = super.read(pBuffer, pOffset, pLength);
> + } catch (IOException pException) {
> + ioe = pException;
> + }
> + if (ioe != null) {
> + noteError(ioe);
> + } else if (result == -1) {
> + noteFinished();
> + } else if (result > 0) {
> + noteDataBytes(pBuffer, pOffset, result);
> + }
> + return result;
> + }
> +
> + /** Notifies the observers by invoking {@link
> Observer#data(byte[],int,int)}
> + * with the given arguments.
> + * @param pBuffer Passed to the observers.
> + * @param pOffset Passed to the observers.
> + * @param pLength Passed to the observers.
> + * @throws IOException Some observer has thrown an exception, which
> is being
> + * passed down.
> + */
> + protected void noteDataBytes(byte[] pBuffer, int pOffset, int
> pLength) throws IOException {
> + for (Observer observer : getObservers()) {
> + observer.data(pBuffer, pOffset, pLength);
> + }
> + }
> +
> + /** Notifies the observers by invoking {@link Observer#finished()}.
> + * @throws IOException Some observer has thrown an exception, which
> is being
> + * passed down.
> + */
> + protected void noteFinished() throws IOException {
> + for (Observer observer : getObservers()) {
> + observer.finished();
> + }
> + }
> +
> + /** Notifies the observers by invoking {@link Observer#data(int)}
> + * with the given arguments.
> + * @param pDataByte Passed to the observers.
> + * @throws IOException Some observer has thrown an exception, which
> is being
> + * passed down.
> + */
> + protected void noteDataByte(int pDataByte) throws IOException {
> + for (Observer observer : getObservers()) {
> + observer.data(pDataByte);
> + }
> + }
> +
> + /** Notifies the observers by invoking {@link
> Observer#error(IOException)}
> + * with the given argument.
> + * @param pException Passed to the observers.
> + * @throws IOException Some observer has thrown an exception, which
> is being
> + * passed down. This may be the same exception, which has been
> passed as an
> + * argument.
> + */
> + protected void noteError(IOException pException) throws IOException {
> + for (Observer observer : getObservers()) {
> + observer.error(pException);
> + }
> + }
> +
> + /** Notifies the observers by invoking {@link Observer#finished()}.
> + * @throws IOException Some observer has thrown an exception, which
> is being
> + * passed down.
> + */
> + protected void noteClosed() throws IOException {
> + for (Observer observer : getObservers()) {
> + observer.closed();
> + }
> + }
> +
> + protected List<Observer> getObservers() {
> + return observers;
> + }
> +
> + @Override
> + public void close() throws IOException {
> + IOException ioe = null;
> + try {
> + super.close();
> + } catch (IOException e) {
> + ioe = e;
> + }
> + if (ioe == null) {
> + noteClosed();
> + } else {
> + noteError(ioe);
> + }
> + }
> +
> + /** Reads all data from the underlying {@link InputStream}, while
> notifying the
> + * observers.
> + * @throws IOException The underlying {@link InputStream}, or either
> of the
> + * observers has thrown an exception.
> + */
> + public void consume() throws IOException {
> + final byte[] buffer = new byte[8192];
> + for (;;) {
> + final int res = read(buffer);
> + if (res == -1) {
> + return;
> + }
> + }
> + }
> +
> +}
>
> Propchange:
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
>
> ------------------------------------------------------------------------------
> svn:mime-type = text/plain
>
> Modified:
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java
> URL:
> http://svn.apache.org/viewvc/commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java?rev=1750760&r1=1750759&r2=1750760&view=diff
>
> ==============================================================================
> ---
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java
> (original)
> +++
> commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java
> Thu Jun 30 09:04:21 2016
> @@ -35,6 +35,7 @@ import java.io.OutputStream;
> *
> * @version $Id$
> * @since 1.4
> + * @see ObservableInputStream
> */
> public class TeeInputStream extends ProxyInputStream {
>
>
> Added:
> commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
> URL:
> http://svn.apache.org/viewvc/commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java?rev=1750760&view=auto
>
> ==============================================================================
> ---
> commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
> (added)
> +++
> commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
> Thu Jun 30 09:04:21 2016
> @@ -0,0 +1,48 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.commons.io.input;
> +
> +import static org.junit.Assert.*;
> +
> +import java.io.ByteArrayInputStream;
> +import java.security.MessageDigest;
> +import java.util.Random;
> +
> +import org.junit.Test;
> +
> +public class MessageDigestCalculatingInputStreamTest {
> + public static byte[] generateRandomByteStream(int pSize) {
> + final byte[] buffer = new byte[pSize];
> + final Random rnd = new Random();
> + rnd.nextBytes(buffer);
> + return buffer;
> + }
> +
> + @Test
> + public void test() throws Exception {
> + for (int i = 256; i < 8192; i = i*2) {
> + final byte[] buffer = generateRandomByteStream(i);
> + final MessageDigest md5Sum = MessageDigest.getInstance("MD5");
> + final byte[] expect = md5Sum.digest(buffer);
> + final MessageDigestCalculatingInputStream md5InputStream =
> new MessageDigestCalculatingInputStream(new ByteArrayInputStream(buffer));
> + md5InputStream.consume();
> + final byte[] got = md5InputStream.getMessageDigest().digest();
> + assertArrayEquals(expect, got);
> + }
> + }
> +
> +}
>
> Propchange:
> commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
>
> ------------------------------------------------------------------------------
> svn:mime-type = text/plain
>
> Added:
> commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
> URL:
> http://svn.apache.org/viewvc/commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java?rev=1750760&view=auto
>
> ==============================================================================
> ---
> commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
> (added)
> +++
> commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
> Thu Jun 30 09:04:21 2016
> @@ -0,0 +1,134 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.commons.io.input;
> +
> +import static org.junit.Assert.*;
> +
> +import java.io.ByteArrayInputStream;
> +import java.io.IOException;
> +
> +import org.apache.commons.io.input.ObservableInputStream;
> +import org.apache.commons.io.input.ObservableInputStream.Observer;
> +import org.junit.Test;
> +
> +public class ObservableInputStreamTest {
> + private static class LastByteKeepingObserver extends Observer {
> + private int lastByteSeen = -1;
> + private boolean finished;
> + private boolean closed;
> +
> + @Override
> + void data(int pByte) throws IOException {
> + super.data(pByte);
> + lastByteSeen = pByte;
> + }
> +
> + @Override
> + void finished() throws IOException {
> + super.finished();
> + finished = true;
> + }
> +
> + @Override
> + void closed() throws IOException {
> + super.closed();
> + closed = true;
> + }
> + }
> + private static class LastBytesKeepingObserver extends Observer {
> + private byte[] buffer = null;
> + private int offset = -1;
> + private int length = -1;
> +
> + @Override
> + void data(byte[] pBuffer, int pOffset, int pLength) throws
> IOException {
> + super.data(pBuffer, pOffset, pLength);
> + buffer = pBuffer;
> + offset = pOffset;
> + length = pLength;
> + }
> + }
> +
> + /** Tests, that {@link Observer#data(int)} is called.
> + */
> + @Test
> + public void testDataByteCalled() throws Exception {
> + final byte[] buffer =
> MessageDigestCalculatingInputStreamTest.generateRandomByteStream(4096);
> + final ObservableInputStream ois = new ObservableInputStream(new
> ByteArrayInputStream(buffer));
> + final LastByteKeepingObserver lko = new LastByteKeepingObserver();
> + assertEquals(-1, lko.lastByteSeen);
> + ois.read();
> + assertEquals(-1, lko.lastByteSeen);
> + assertFalse(lko.finished);
> + assertFalse(lko.closed);
> + ois.add(lko);
> + for (int i = 1; i < buffer.length; i++) {
> + final int result = ois.read();
> + assertEquals((byte) result, buffer[i]);
> + assertEquals(result, lko.lastByteSeen);
> + assertFalse(lko.finished);
> + assertFalse(lko.closed);
> + }
> + final int result = ois.read();
> + assertEquals(-1, result);
> + assertTrue(lko.finished);
> + assertFalse(lko.closed);
> + ois.close();
> + assertTrue(lko.finished);
> + assertTrue(lko.closed);
> + }
> +
> + /** Tests, that {@link Observer#data(byte[],int,int)} is called.
> + */
> + @Test
> + public void testDataBytesCalled() throws Exception {
> + final byte[] buffer =
> MessageDigestCalculatingInputStreamTest.generateRandomByteStream(4096);
> + ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
> + final ObservableInputStream ois = new ObservableInputStream(bais);
> + final LastBytesKeepingObserver lko = new
> LastBytesKeepingObserver();
> + final byte[] readBuffer = new byte[23];
> + assertEquals(null, lko.buffer);
> + ois.read(readBuffer);
> + assertEquals(null, lko.buffer);
> + ois.add(lko);
> + for (;;) {
> + if (bais.available() >= 2048) {
> + final int result = ois.read(readBuffer);
> + if (result == -1) {
> + ois.close();
> + break;
> + } else {
> + assertEquals(readBuffer, lko.buffer);
> + assertEquals(0, lko.offset);
> + assertEquals(readBuffer.length, lko.length);
> + }
> + } else {
> + final int res = Math.min(11, bais.available());
> + final int result = ois.read(readBuffer, 1, 11);
> + if (result == -1) {
> + ois.close();
> + break;
> + } else {
> + assertEquals(readBuffer, lko.buffer);
> + assertEquals(1, lko.offset);
> + assertEquals(res, lko.length);
> + }
> + }
> + }
> + }
> +
> +}
>
> Propchange:
> commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
>
> ------------------------------------------------------------------------------
> svn:mime-type = text/plain
>
>
>