You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@polygene.apache.org by ni...@apache.org on 2015/07/31 04:47:38 UTC
[19/51] [abbrv] [partial] zest-java git commit: Revert "First round
of changes to move to org.apache.zest namespace."
http://git-wip-us.apache.org/repos/asf/zest-java/blob/a789141d/core/io/src/main/java/org/qi4j/io/Inputs.java
----------------------------------------------------------------------
diff --git a/core/io/src/main/java/org/qi4j/io/Inputs.java b/core/io/src/main/java/org/qi4j/io/Inputs.java
new file mode 100644
index 0000000..eb2001f
--- /dev/null
+++ b/core/io/src/main/java/org/qi4j/io/Inputs.java
@@ -0,0 +1,490 @@
+/*
+ * Copyright (c) 2010, Rickard Öberg. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.qi4j.io;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Scanner;
+import java.util.zip.GZIPInputStream;
+import org.qi4j.functional.Visitor;
+
+/**
+ * Common inputs
+ */
+public class Inputs
+{
+ // START SNIPPET: method
+
+ /**
+ * Read lines from a String.
+ *
+ * @param source lines
+ *
+ * @return Input that provides lines from the string as strings
+ */
+ public static Input<String, RuntimeException> text( final String source )
+ // END SNIPPET: method
+ {
+ return new Input<String, RuntimeException>()
+ {
+ @Override
+ public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output )
+ throws RuntimeException, ReceiverThrowableType
+ {
+
+ output.receiveFrom( new Sender<String, RuntimeException>()
+ {
+ @Override
+ public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super String, Receiver2ThrowableType> receiver )
+ throws Receiver2ThrowableType, RuntimeException
+ {
+ Scanner scanner = new Scanner( source );
+ while( scanner.hasNextLine() )
+ {
+ receiver.receive( scanner.nextLine() );
+ }
+ }
+ } );
+ }
+ };
+ }
+
+ // START SNIPPET: method
+
+ /**
+ * Read lines from a Reader.
+ *
+ * @param source lines
+ *
+ * @return Input that provides lines from the string as strings
+ */
+ public static Input<String, RuntimeException> text( final Reader source )
+ // END SNIPPET: method
+ {
+ return new Input<String, RuntimeException>()
+ {
+ @Override
+ public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output )
+ throws RuntimeException, ReceiverThrowableType
+ {
+
+ output.receiveFrom( new Sender<String, RuntimeException>()
+ {
+ @Override
+ public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super String, Receiver2ThrowableType> receiver )
+ throws Receiver2ThrowableType, RuntimeException
+ {
+ Scanner scanner = new Scanner( source );
+ while( scanner.hasNextLine() )
+ {
+ receiver.receive( scanner.nextLine() );
+ }
+ }
+ } );
+ }
+ };
+ }
+
+ // START SNIPPET: method
+
+ /**
+ * Read lines from a UTF-8 encoded textfile.
+ *
+ * If the filename ends with .gz, then the data is automatically unzipped when read.
+ *
+ * @param source textfile with lines separated by \n character
+ *
+ * @return Input that provides lines from the textfiles as strings
+ */
+ public static Input<String, IOException> text( final File source )
+ // END SNIPPET: method
+ {
+ return text( source, "UTF-8" );
+ }
+
+ // START SNIPPET: method
+
+ /**
+ * Read lines from a textfile with the given encoding.
+ *
+ * If the filename ends with .gz, then the data is automatically unzipped when read.
+ *
+ * @param source textfile with lines separated by \n character
+ * @param encoding encoding of file, e.g. "UTF-8"
+ *
+ * @return Input that provides lines from the textfiles as strings
+ */
+ public static Input<String, IOException> text( final File source, final String encoding )
+ // END SNIPPET: method
+ {
+ return new Input<String, IOException>()
+ {
+ @Override
+ public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output )
+ throws IOException, ReceiverThrowableType
+ {
+ InputStream stream = new FileInputStream( source );
+
+ // If file is gzipped, unzip it automatically
+ if( source.getName().endsWith( ".gz" ) )
+ {
+ stream = new GZIPInputStream( stream );
+ }
+
+ try (BufferedReader reader = new BufferedReader( new InputStreamReader( stream, encoding ) ))
+ {
+ output.receiveFrom( new Sender<String, IOException>()
+ {
+ @Override
+ public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super String, Receiver2ThrowableType> receiver )
+ throws Receiver2ThrowableType, IOException
+ {
+ String line;
+ while( ( line = reader.readLine() ) != null )
+ {
+ receiver.receive( line );
+ }
+ }
+ } );
+ }
+ }
+ };
+ }
+
+ // START SNIPPET: method
+
+ /**
+ * Read lines from a textfile at a given URL.
+ *
+ * If the content support gzip encoding, then the data is automatically unzipped when read.
+ *
+ * The charset in the content-type of the URL will be used for parsing. Default is UTF-8.
+ *
+ * @param source textfile with lines separated by \n character
+ *
+ * @return Input that provides lines from the textfiles as strings
+ */
+ public static Input<String, IOException> text( final URL source )
+ // END SNIPPET: method
+ {
+ return new Input<String, IOException>()
+ {
+ @Override
+ public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output )
+ throws IOException, ReceiverThrowableType
+ {
+ URLConnection urlConnection = source.openConnection();
+ urlConnection.setRequestProperty( "Accept-Encoding", "gzip" );
+ InputStream stream = urlConnection.getInputStream();
+
+ // If file is gzipped, unzip it automatically
+ if( "gzip".equals( urlConnection.getContentEncoding() ) )
+ {
+ stream = new GZIPInputStream( stream );
+ }
+
+ // Figure out charset given content-type
+ String contentType = urlConnection.getContentType();
+ String charSet = "UTF-8";
+ if( contentType.contains( "charset=" ) )
+ {
+ charSet = contentType.substring( contentType.indexOf( "charset=" ) + "charset=".length() );
+ }
+
+ try (BufferedReader reader = new BufferedReader( new InputStreamReader( stream, charSet ) ))
+ {
+ output.receiveFrom( new Sender<String, IOException>()
+ {
+ @Override
+ public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super String, Receiver2ThrowableType> receiver )
+ throws Receiver2ThrowableType, IOException
+ {
+ String line;
+ while( ( line = reader.readLine() ) != null )
+ {
+ receiver.receive( line );
+ }
+ }
+ } );
+ }
+ }
+ };
+ }
+
+ // START SNIPPET: method
+
+ /**
+ * Read a file using ByteBuffer of a given size. Useful for transferring raw data.
+ *
+ * @param source The file to be read.
+ * @param bufferSize The size of the byte array.
+ *
+ * @return An Input instance to be applied to streaming operations.
+ */
+ public static Input<ByteBuffer, IOException> byteBuffer( final File source, final int bufferSize )
+ // END SNIPPET: method
+ {
+ return new Input<ByteBuffer, IOException>()
+ {
+ @Override
+ public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super ByteBuffer, ReceiverThrowableType> output )
+ throws IOException, ReceiverThrowableType
+ {
+ final FileInputStream stream = new FileInputStream( source );
+ final FileChannel fci = stream.getChannel();
+
+ final ByteBuffer buffer = ByteBuffer.allocate( bufferSize );
+
+ try
+ {
+ output.receiveFrom( new Sender<ByteBuffer, IOException>()
+ {
+ @Override
+ public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super ByteBuffer, Receiver2ThrowableType> receiver )
+ throws Receiver2ThrowableType, IOException
+ {
+ while( fci.read( buffer ) != -1 )
+ {
+ buffer.flip();
+ receiver.receive( buffer );
+ buffer.clear();
+ }
+ }
+ } );
+ }
+ finally
+ {
+ stream.close();
+ }
+ }
+ };
+ }
+
+ // START SNIPPET: method
+
+ /**
+ * Read an inputstream using ByteBuffer of a given size.
+ *
+ * @param source The InputStream to be read.
+ * @param bufferSize The size of the byte array.
+ *
+ * @return An Input instance to be applied to streaming operations.
+ */
+ public static Input<ByteBuffer, IOException> byteBuffer( final InputStream source, final int bufferSize )
+ // END SNIPPET: method
+ {
+ return new Input<ByteBuffer, IOException>()
+ {
+ @Override
+ public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super ByteBuffer, ReceiverThrowableType> output )
+ throws IOException, ReceiverThrowableType
+ {
+ try
+ {
+ output.receiveFrom( new Sender<ByteBuffer, IOException>()
+ {
+ @Override
+ public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super ByteBuffer, Receiver2ThrowableType> receiver )
+ throws Receiver2ThrowableType, IOException
+ {
+ byte[] buffer = new byte[ bufferSize ];
+
+ int len;
+ while( ( len = source.read( buffer ) ) != -1 )
+ {
+ ByteBuffer byteBuffer = ByteBuffer.wrap( buffer, 0, len );
+ receiver.receive( byteBuffer );
+ }
+ }
+ } );
+ }
+ finally
+ {
+ source.close();
+ }
+ }
+ };
+ }
+
+ // START SNIPPET: method
+
+ /**
+ * Combine many Input into one single Input. When a transfer is initiated from it all items from all inputs will be transferred
+ * to the given Output.
+ *
+ * @param inputs An Iterable of Input instances to be combined.
+ * @param <T> The item type of the Input
+ * @param <SenderThrowableType> The Throwable that might be thrown by the Inputs.
+ *
+ * @return A combined Input, allowing for easy aggregation of many Input sources.
+ */
+ public static <T, SenderThrowableType extends Throwable> Input<T, SenderThrowableType> combine( final Iterable<Input<T, SenderThrowableType>> inputs )
+ // END SNIPPET: method
+ {
+ return new Input<T, SenderThrowableType>()
+ {
+ @Override
+ public <Receiver2ThrowableType extends Throwable> void transferTo( Output<? super T, Receiver2ThrowableType> output )
+ throws SenderThrowableType, Receiver2ThrowableType
+ {
+ output.receiveFrom( new Sender<T, SenderThrowableType>()
+ {
+ @Override
+ public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super T, ReceiverThrowableType> receiver )
+ throws ReceiverThrowableType, SenderThrowableType
+ {
+ for( Input<T, SenderThrowableType> input : inputs )
+ {
+ input.transferTo( new Output<T, ReceiverThrowableType>()
+ {
+ @Override
+ public <Sender2ThrowableType extends Throwable> void receiveFrom( Sender<? extends T, Sender2ThrowableType> sender )
+ throws ReceiverThrowableType, Sender2ThrowableType
+ {
+ sender.sendTo( new Receiver<T, ReceiverThrowableType>()
+ {
+ @Override
+ public void receive( T item )
+ throws ReceiverThrowableType
+ {
+ receiver.receive( item );
+ }
+ } );
+ }
+ } );
+ }
+ }
+ } );
+ }
+ };
+ }
+
+ // START SNIPPET: method
+
+ /**
+ * Create an Input that takes its items from the given Iterable.
+ *
+ * @param iterable The Iterable to be used as an Input.
+ * @param <T> The item type of the Input
+ *
+ * @return An Input instance that is backed by the Iterable.
+ */
+ public static <T> Input<T, RuntimeException> iterable( final Iterable<T> iterable )
+ // END SNIPPET: method
+ {
+ return new Input<T, RuntimeException>()
+ {
+ @Override
+ public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super T, ReceiverThrowableType> output )
+ throws RuntimeException, ReceiverThrowableType
+ {
+ output.receiveFrom( new Sender<T, RuntimeException>()
+ {
+ @Override
+ public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super T, Receiver2ThrowableType> receiver )
+ throws Receiver2ThrowableType, RuntimeException
+ {
+ for( T item : iterable )
+ {
+ receiver.receive( item );
+ }
+ }
+ } );
+ }
+ };
+ }
+
+ // START SNIPPET: method
+
+ /**
+ * Create an Input that allows a Visitor to write to an OutputStream. The stream is a BufferedOutputStream, so when enough
+ * data has been gathered it will send this in chunks of the given size to the Output it is transferred to. The Visitor does not have to call
+ * close() on the OutputStream, but should ensure that any wrapper streams or writers are flushed so that all data is sent.
+ *
+ * @param outputVisitor The OutputStream Visitor that will be backing the Input ByteBuffer.
+ * @param bufferSize The buffering size.
+ *
+ * @return An Input instance of ByteBuffer, that is backed by an Visitor to an OutputStream.
+ */
+ public static Input<ByteBuffer, IOException> output( final Visitor<OutputStream, IOException> outputVisitor,
+ final int bufferSize
+ )
+ // END SNIPPET: method
+ {
+ return new Input<ByteBuffer, IOException>()
+ {
+ @Override
+ public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super ByteBuffer, ReceiverThrowableType> output )
+ throws IOException, ReceiverThrowableType
+ {
+ output.receiveFrom( new Sender<ByteBuffer, IOException>()
+ {
+ @Override
+ @SuppressWarnings( "unchecked" )
+ public <Receiver2ThrowableType extends Throwable> void sendTo( final Receiver<? super ByteBuffer, Receiver2ThrowableType> receiver )
+ throws Receiver2ThrowableType, IOException
+ {
+ try (OutputStream out = new BufferedOutputStream( new OutputStream()
+ {
+ @Override
+ public void write( int b )
+ throws IOException
+ {
+ // Ignore
+ }
+
+ @SuppressWarnings( "NullableProblems" )
+ @Override
+ public void write( byte[] b, int off, int len )
+ throws IOException
+ {
+ try
+ {
+ ByteBuffer byteBuffer = ByteBuffer.wrap( b, 0, len );
+ receiver.receive( byteBuffer );
+ }
+ catch( Throwable ex )
+ {
+ throw new IOException( ex );
+ }
+ }
+ }, bufferSize ))
+ {
+ outputVisitor.visit( out );
+ }
+ catch( IOException ex )
+ {
+ throw (Receiver2ThrowableType) ex.getCause();
+ }
+ }
+ } );
+ }
+ };
+ }
+
+ private Inputs()
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/a789141d/core/io/src/main/java/org/qi4j/io/Output.java
----------------------------------------------------------------------
diff --git a/core/io/src/main/java/org/qi4j/io/Output.java b/core/io/src/main/java/org/qi4j/io/Output.java
new file mode 100644
index 0000000..3dcd207
--- /dev/null
+++ b/core/io/src/main/java/org/qi4j/io/Output.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2010, Rickard Öberg. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.qi4j.io;
+
+/**
+ * Output for data.
+ */
+// START SNIPPET: output
+public interface Output<T, ReceiverThrowableType extends Throwable>
+{
+// END SNIPPET: output
+
+ /**
+ * This initiates a transfer from an Input. Implementations should open any resources to be written to
+ * and then call sender.sendTo() when it is ready to receive data. When sendTo() returns the resource should be
+ * closed properly. Make sure to handle any exceptions from sendTo.
+ *
+ * @param sender the sender of data to this output
+ * @param <SenderThrowableType> the exception that sendTo can throw
+ *
+ * @throws SenderThrowableType the exception that the sender can throw
+ * @throws ReceiverThrowableType the exception that this output can throw from receiveItem()
+ */
+// START SNIPPET: output
+ <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends T, SenderThrowableType> sender )
+ throws ReceiverThrowableType, SenderThrowableType;
+}
+// END SNIPPET: output
http://git-wip-us.apache.org/repos/asf/zest-java/blob/a789141d/core/io/src/main/java/org/qi4j/io/Outputs.java
----------------------------------------------------------------------
diff --git a/core/io/src/main/java/org/qi4j/io/Outputs.java b/core/io/src/main/java/org/qi4j/io/Outputs.java
new file mode 100644
index 0000000..2508788
--- /dev/null
+++ b/core/io/src/main/java/org/qi4j/io/Outputs.java
@@ -0,0 +1,528 @@
+/*
+ * Copyright (c) 2010, Rickard Öberg. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.qi4j.io;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Collection;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * Utility methods for creating standard Outputs
+ */
+public class Outputs
+{
+ // START SNIPPET: method
+
+ /**
+ * Write lines to a text file with UTF-8 encoding. Separate each line with a newline ("\n" character). If the writing or sending fails,
+ * the file is deleted.
+ * <p>
+ * If the filename ends with .gz, then the data is automatically GZipped.
+ * </p>
+ * @param file the file to save the text to
+ *
+ * @return an Output for storing text in a file
+ */
+ public static Output<String, IOException> text( final File file )
+ // END SNIPPET: method
+ {
+ return text( file, "UTF-8" );
+ }
+
+ // START SNIPPET: method
+
+ /**
+ * Write lines to a text file. Separate each line with a newline ("\n" character). If the writing or sending fails,
+ * the file is deleted.
+ * <p>
+ * If the filename ends with .gz, then the data is automatically GZipped.
+ * </p>
+ * @param file the file to save the text to
+ *
+ * @return an Output for storing text in a file
+ */
+ public static Output<String, IOException> text( final File file, final String encoding )
+ // END SNIPPET: method
+ {
+ return new Output<String, IOException>()
+ {
+ @Override
+ @SuppressWarnings( "unchecked" )
+ public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender )
+ throws IOException, SenderThrowableType
+ {
+ File tmpFile = Files.createTemporayFileOf( file );
+
+ OutputStream stream = new FileOutputStream( tmpFile );
+
+ // If file should be gzipped, do that automatically
+ if( file.getName().endsWith( ".gz" ) )
+ {
+ stream = new GZIPOutputStream( stream );
+ }
+
+ final BufferedWriter writer = new BufferedWriter( new OutputStreamWriter( stream, encoding ) );
+
+ try
+ {
+ sender.sendTo( new Receiver<String, IOException>()
+ {
+ @Override
+ public void receive( String item )
+ throws IOException
+ {
+ writer.append( item ).append( '\n' );
+ }
+ } );
+ writer.close();
+
+ // Replace file with temporary file
+ if( !file.exists() || file.delete() )
+ {
+ if( ! tmpFile.renameTo( file ) )
+ {
+ // TODO: What?? Throw an Exception?
+ System.err.println( "Unable to rename file: " + tmpFile + " to " + file );
+ }
+ }
+ }
+ catch( IOException e )
+ {
+ // We failed writing - close and delete
+ writer.close();
+ if( ! tmpFile.delete() )
+ {
+ System.err.println("Unable to delete temporary file." );
+ tmpFile.deleteOnExit();
+ }
+ }
+ catch( Throwable senderThrowableType )
+ {
+ // We failed writing - close and delete
+ writer.close();
+ if( ! tmpFile.delete() )
+ {
+ System.err.println("Unable to delete temporary file." );
+ tmpFile.deleteOnExit();
+ }
+ throw (SenderThrowableType) senderThrowableType;
+ }
+ }
+ };
+ }
+
+ // START SNIPPET: method
+
+ /**
+ * Write lines to a Writer. Separate each line with a newline ("\n" character).
+ *
+ * @param writer the Writer to write the text to
+ * @return an Output for storing text in a Writer
+ */
+ public static Output<String, IOException> text( final Writer writer )
+ // END SNIPPET: method
+ {
+ return new Output<String, IOException>()
+ {
+
+ @Override
+ public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender )
+ throws IOException, SenderThrowableType
+ {
+ sender.sendTo( new Receiver<String, IOException>()
+ {
+
+ @Override
+ public void receive( String item )
+ throws IOException
+ {
+ writer.append( item ).append( "\n" );
+ }
+
+ } );
+ }
+
+ };
+ }
+
+ // START SNIPPET: method
+
+ /**
+ * Write lines to a StringBuilder. Separate each line with a newline ("\n" character).
+ *
+ * @param builder the StringBuilder to append the text to
+ * @return an Output for storing text in a StringBuilder
+ */
+ public static Output<String, IOException> text( final StringBuilder builder )
+ // END SNIPPET: method
+ {
+ return new Output<String, IOException>()
+ {
+
+ @Override
+ public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender )
+ throws IOException, SenderThrowableType
+ {
+ sender.sendTo( new Receiver<String, IOException>()
+ {
+
+ @Override
+ public void receive( String item )
+ throws IOException
+ {
+ builder.append( item ).append( "\n" );
+ }
+
+ } );
+ }
+
+ };
+ }
+
+ // START SNIPPET: method
+
+ /**
+ * Write ByteBuffer data to a file. If the writing or sending of data fails the file will be deleted.
+ *
+ * @param file The destination file.
+ *
+ * @return The Output ByteBuffer instance backed by a File.
+ */
+ public static Output<ByteBuffer, IOException> byteBuffer( final File file )
+ // END SNIPPET: method
+ {
+ return new Output<ByteBuffer, IOException>()
+ {
+ @Override
+ @SuppressWarnings( "unchecked" )
+ public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends ByteBuffer, SenderThrowableType> sender )
+ throws IOException, SenderThrowableType
+ {
+ File tmpFile = Files.createTemporayFileOf( file );
+ FileOutputStream stream = new FileOutputStream( tmpFile );
+ final FileChannel fco = stream.getChannel();
+
+ try
+ {
+ sender.sendTo( new Receiver<ByteBuffer, IOException>()
+ {
+ @Override
+ public void receive( ByteBuffer item )
+ throws IOException
+ {
+ fco.write( item );
+ }
+ } );
+ stream.close();
+
+ // Replace file with temporary file
+ if( !file.exists() || file.delete() )
+ {
+ if( ! tmpFile.renameTo( file ) )
+ {
+ // TODO: What can be done in this case?
+ System.err.println( "Unable to rename file: " + tmpFile + " to " + file );
+ }
+ }
+ }
+ catch( IOException e )
+ {
+ // We failed writing - close and delete
+ stream.close();
+ if( ! tmpFile.delete() )
+ {
+ System.err.println("Unable to delete temporary file." );
+ tmpFile.deleteOnExit();
+ }
+
+ }
+ catch( Throwable senderThrowableType )
+ {
+ // We failed writing - close and delete
+ stream.close();
+ if( ! tmpFile.delete() )
+ {
+ System.err.println("Unable to delete temporary file." );
+ tmpFile.deleteOnExit();
+ }
+ throw (SenderThrowableType) senderThrowableType;
+ }
+ }
+ };
+ }
+
+ // START SNIPPET: method
+
+ /**
+ * Write ByteBuffer data to an OutputStream.
+ *
+ * @param stream Destination OutputStream
+ *
+ * @return The Output of ByteBuffer that will be backed by the OutputStream.
+ */
+ public static Output<ByteBuffer, IOException> byteBuffer( final OutputStream stream )
+ // END SNIPPET: method
+ {
+ return new Output<ByteBuffer, IOException>()
+ {
+ @Override
+ public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends ByteBuffer, SenderThrowableType> sender )
+ throws IOException, SenderThrowableType
+ {
+ try
+ {
+ sender.sendTo( new Receiver<ByteBuffer, IOException>()
+ {
+ @Override
+ public void receive( ByteBuffer item )
+ throws IOException
+ {
+ if( item.hasArray() )
+ {
+ stream.write( item.array(), item.arrayOffset(), item.limit() );
+ }
+ else
+ {
+ for( int i = 0; i < item.limit(); i++ )
+ {
+ stream.write( item.get( i ) );
+ }
+ }
+ }
+ } );
+ }
+ finally
+ {
+ stream.close();
+ }
+ }
+ };
+ }
+
+ // START SNIPPET: method
+
+ /**
+ * Write byte array data to a file. If the writing or sending of data fails the file will be deleted.
+ *
+ * @param file The File to be written to.
+ * @param bufferSize The size of the ByteBuffer.
+ *
+ * @return An Output instance that will write to the given File.
+ */
+ public static Output<byte[], IOException> bytes( final File file, final int bufferSize )
+ // END SNIPPET: method
+ {
+ return new Output<byte[], IOException>()
+ {
+ @Override
+ @SuppressWarnings( "unchecked" )
+ public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends byte[], SenderThrowableType> sender )
+ throws IOException, SenderThrowableType
+ {
+ File tmpFile = Files.createTemporayFileOf( file );
+ final OutputStream stream = new BufferedOutputStream( new FileOutputStream( tmpFile ), bufferSize );
+
+ try
+ {
+ sender.sendTo( new Receiver<byte[], IOException>()
+ {
+ @Override
+ public void receive( byte[] item )
+ throws IOException
+ {
+ stream.write( item );
+ }
+ } );
+ stream.close();
+
+ // Replace file with temporary file
+ if( !file.exists() || file.delete() )
+ {
+ if( ! tmpFile.renameTo( file ) )
+ {
+ // TODO: WHAT???
+ System.err.println( "Unable to rename " + tmpFile + " to " + file );
+ }
+ }
+ }
+ catch( IOException e )
+ {
+ // We failed writing - close and delete
+ stream.close();
+ if( ! tmpFile.delete() )
+ {
+ System.err.println("Unable to delete temporary file." );
+ tmpFile.deleteOnExit();
+ }
+ }
+ catch( Throwable senderThrowableType )
+ {
+ // We failed writing - close and delete
+ stream.close();
+ if( ! tmpFile.delete() )
+ {
+ System.err.println("Unable to delete temporary file." );
+ tmpFile.deleteOnExit();
+ }
+ throw (SenderThrowableType) senderThrowableType;
+ }
+ }
+ };
+ }
+
+ // START SNIPPET: method
+
+ /**
+ * Do nothing. Use this if you have all logic in filters and/or specifications
+ *
+ * @param <T> The item type.
+ *
+ * @return An Output instance that ignores all data.
+ */
+ public static <T> Output<T, RuntimeException> noop()
+ // END SNIPPET: method
+ {
+ return withReceiver( new Receiver<T, RuntimeException>()
+ {
+ @Override
+ public void receive( T item )
+ throws RuntimeException
+ {
+ // Do nothing
+ }
+ } );
+ }
+
+ // START SNIPPET: method
+
+ /**
+ * Use given receiver as Output. Use this if there is no need to create a "transaction" for each transfer, and no need
+ * to do batch writes or similar.
+ *
+ * @param <T> The item type
+ * @param receiver receiver for this Output
+ *
+ * @return An Output instance backed by a Receiver of items.
+ */
+ public static <T, ReceiverThrowableType extends Throwable> Output<T, ReceiverThrowableType> withReceiver( final Receiver<T, ReceiverThrowableType> receiver )
+ // END SNIPPET: method
+ {
+ return new Output<T, ReceiverThrowableType>()
+ {
+ @Override
+ public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends T, SenderThrowableType> sender )
+ throws ReceiverThrowableType, SenderThrowableType
+ {
+ sender.sendTo( receiver );
+ }
+ };
+ }
+
+ // START SNIPPET: method
+
+ /**
+ * Write objects to System.out.println.
+ *
+ * @return An Output instance that is backed by System.out
+ */
+ public static Output<Object, RuntimeException> systemOut()
+ // END SNIPPET: method
+ {
+ return new Output<Object, RuntimeException>()
+ {
+ @Override
+ public <SenderThrowableType extends Throwable> void receiveFrom( Sender<?, SenderThrowableType> sender )
+ throws RuntimeException, SenderThrowableType
+ {
+ sender.sendTo( new Receiver<Object, RuntimeException>()
+ {
+ @Override
+ public void receive( Object item )
+ {
+ System.out.println( item );
+ }
+ } );
+ }
+ };
+ }
+
+ // START SNIPPET: method
+
+ /**
+ * Write objects to System.err.println.
+ *
+ * @return An Output instance backed by System.in
+ */
+ @SuppressWarnings( "UnusedDeclaration" )
+ public static Output<Object, RuntimeException> systemErr()
+ // END SNIPPET: method
+ {
+ return new Output<Object, RuntimeException>()
+ {
+ @Override
+ public <SenderThrowableType extends Throwable> void receiveFrom( Sender<?, SenderThrowableType> sender )
+ throws RuntimeException, SenderThrowableType
+ {
+ sender.sendTo( new Receiver<Object, RuntimeException>()
+ {
+ @Override
+ public void receive( Object item )
+ {
+ System.err.println( item );
+ }
+ } );
+ }
+ };
+ }
+
+ // START SNIPPET: method
+
+ /**
+ * Add items to a collection
+ */
+ public static <T> Output<T, RuntimeException> collection( final Collection<T> collection )
+ // END SNIPPET: method
+ {
+ return new Output<T, RuntimeException>()
+ {
+ @Override
+ public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends T, SenderThrowableType> sender )
+ throws RuntimeException, SenderThrowableType
+ {
+ sender.sendTo( new Receiver<T, RuntimeException>()
+ {
+ @Override
+ public void receive( T item )
+ throws RuntimeException
+ {
+ collection.add( item );
+ }
+ } );
+ }
+ };
+ }
+
+ private Outputs()
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/a789141d/core/io/src/main/java/org/qi4j/io/Receiver.java
----------------------------------------------------------------------
diff --git a/core/io/src/main/java/org/qi4j/io/Receiver.java b/core/io/src/main/java/org/qi4j/io/Receiver.java
new file mode 100644
index 0000000..6318cdf
--- /dev/null
+++ b/core/io/src/main/java/org/qi4j/io/Receiver.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2010, Rickard Öberg. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.qi4j.io;
+
+/**
+ * Receiver of items during a specific transfer from an Input to an Output.
+ */
+// START SNIPPET: receiver
+public interface Receiver<T, ReceiverThrowableType extends Throwable>
+{
+// END SNIPPET: receiver
+ /**
+ * Receive a single item of the given type. The receiver should process it
+ * and optionally throw an exception if it fails.
+ *
+ * @param item
+ *
+ * @throws ReceiverThrowableType
+ */
+// START SNIPPET: receiver
+ void receive( T item )
+ throws ReceiverThrowableType;
+}
+// END SNIPPET: receiver
http://git-wip-us.apache.org/repos/asf/zest-java/blob/a789141d/core/io/src/main/java/org/qi4j/io/Sender.java
----------------------------------------------------------------------
diff --git a/core/io/src/main/java/org/qi4j/io/Sender.java b/core/io/src/main/java/org/qi4j/io/Sender.java
new file mode 100644
index 0000000..05ac007
--- /dev/null
+++ b/core/io/src/main/java/org/qi4j/io/Sender.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2010, Rickard Öberg. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.qi4j.io;
+
+/**
+ * Sender of items for a particular transfer from an Input to an Output
+ */
+// START SNIPPET: sender
+public interface Sender<T, SenderThrowableType extends Throwable>
+{
+// END SNIPPET: sender
+ /**
+ * The sender should send all items it holds to the receiver by invoking receiveItem for each item.
+ *
+ * If the receive fails it should properly close any open resources.
+ *
+ * @param receiver
+ * @param <ReceiverThrowableType>
+ *
+ * @throws ReceiverThrowableType
+ * @throws SenderThrowableType
+ */
+// START SNIPPET: sender
+ <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super T, ReceiverThrowableType> receiver )
+ throws ReceiverThrowableType, SenderThrowableType;
+}
+// END SNIPPET: sender
http://git-wip-us.apache.org/repos/asf/zest-java/blob/a789141d/core/io/src/main/java/org/qi4j/io/Transforms.java
----------------------------------------------------------------------
diff --git a/core/io/src/main/java/org/qi4j/io/Transforms.java b/core/io/src/main/java/org/qi4j/io/Transforms.java
new file mode 100644
index 0000000..a5d0040
--- /dev/null
+++ b/core/io/src/main/java/org/qi4j/io/Transforms.java
@@ -0,0 +1,435 @@
+/*
+ * Copyright (c) 2010, Rickard Öberg. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.qi4j.io;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.text.MessageFormat;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.logging.Logger;
+import org.qi4j.functional.Function;
+import org.qi4j.functional.Specification;
+
+/**
+ * Utility class for I/O transforms
+ */
+public class Transforms
+{
+ /**
+ * Filter items in a transfer by applying the given Specification to each item.
+ *
+ * @param specification The Specification defining the items to not filter away.
+ * @param output The Output instance to receive to result.
+ * @param <T> The item type
+ * @param <Receiver2ThrowableType> Exception type that might be thrown by the Receiver.
+ *
+ * @return And Output encapsulation the filter operation.
+ */
+ public static <T, Receiver2ThrowableType extends Throwable> Output<T, Receiver2ThrowableType> filter( final Specification<? super T> specification,
+ final Output<T, Receiver2ThrowableType> output
+ )
+ {
+ return new Output<T, Receiver2ThrowableType>()
+ {
+ @Override
+ public <SenderThrowableType extends Throwable> void receiveFrom( final Sender<? extends T, SenderThrowableType> sender )
+ throws Receiver2ThrowableType, SenderThrowableType
+ {
+ output.receiveFrom( new Sender<T, SenderThrowableType>()
+ {
+ @Override
+ public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super T, ReceiverThrowableType> receiver )
+ throws ReceiverThrowableType, SenderThrowableType
+ {
+ sender.sendTo( new Receiver<T, ReceiverThrowableType>()
+ {
+ @Override
+ public void receive( T item )
+ throws ReceiverThrowableType
+ {
+ if( specification.satisfiedBy( item ) )
+ {
+ receiver.receive( item );
+ }
+ }
+ } );
+ }
+ } );
+ }
+ };
+ }
+
+ /**
+ * Map items in a transfer from one type to another by applying the given function.
+ *
+ * @param function The transformation function to apply to the streaming items.
+ * @param output The output to receive the transformed items.
+ * @param <From> The type of the incoming items.
+ * @param <To> The type of the transformed items.
+ * @param <Receiver2ThrowableType> The exception type that the Receiver might throw.
+ *
+ * @return An Output instance that encapsulates the map transformation.
+ */
+ public static <From, To, Receiver2ThrowableType extends Throwable> Output<From, Receiver2ThrowableType> map( final Function<? super From, ? extends To> function,
+ final Output<To, Receiver2ThrowableType> output
+ )
+ {
+ return new Output<From, Receiver2ThrowableType>()
+ {
+ @Override
+ public <SenderThrowableType extends Throwable> void receiveFrom( final Sender<? extends From, SenderThrowableType> sender )
+ throws Receiver2ThrowableType, SenderThrowableType
+ {
+ output.receiveFrom( new Sender<To, SenderThrowableType>()
+ {
+ @Override
+ public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super To, ReceiverThrowableType> receiver )
+ throws ReceiverThrowableType, SenderThrowableType
+ {
+ sender.sendTo( new Receiver<From, ReceiverThrowableType>()
+ {
+ @Override
+ public void receive( From item )
+ throws ReceiverThrowableType
+ {
+ receiver.receive( function.map( item ) );
+ }
+ } );
+ }
+ } );
+ }
+ };
+ }
+
+ /**
+ * Apply the given function to items in the transfer that match the given specification. Other items will pass
+ * through directly.
+ *
+ * @param specification The Specification defining which items should be transformed.
+ * @param function The transformation function.
+ * @param output The Output that will receive the resulting items.
+ * @param <T> The item type. Items can not be transformed to a new type.
+ * @param <Receiver2ThrowableType> The exception that the Receiver might throw.
+ *
+ * @return An Output instance that encapsulates the operation.
+ */
+ public static <T, Receiver2ThrowableType extends Throwable> Output<T, Receiver2ThrowableType> filteredMap( final Specification<? super T> specification,
+ final Function<? super T, ? extends T> function,
+ final Output<T, Receiver2ThrowableType> output
+ )
+ {
+ return new Output<T, Receiver2ThrowableType>()
+ {
+ @Override
+ public <SenderThrowableType extends Throwable> void receiveFrom( final Sender<? extends T, SenderThrowableType> sender )
+ throws Receiver2ThrowableType, SenderThrowableType
+ {
+ output.receiveFrom( new Sender<T, SenderThrowableType>()
+ {
+ @Override
+ public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super T, ReceiverThrowableType> receiver )
+ throws ReceiverThrowableType, SenderThrowableType
+ {
+ sender.sendTo( new Receiver<T, ReceiverThrowableType>()
+ {
+ @Override
+ public void receive( T item )
+ throws ReceiverThrowableType
+ {
+ if( specification.satisfiedBy( item ) )
+ {
+ receiver.receive( function.map( item ) );
+ }
+ else
+ {
+ receiver.receive( item );
+ }
+ }
+ } );
+ }
+ } );
+ }
+ };
+ }
+
+ /**
+ * Wrapper for Outputs that uses a lock whenever a transfer is instantiated. Typically a read-lock would be used on
+ * the sending side and a write-lock would be used on the receiving side. Inputs can use this as well to create a
+ * wrapper on the send side when transferTo is invoked.
+ *
+ * @param lock the lock to be used for transfers
+ * @param output output to be wrapped
+ * @param <T> The Item type
+ * @param <Receiver2ThrowableType> The Exception type that the Receiver might throw.
+ *
+ * @return Output wrapper that uses the given lock during transfers.
+ */
+ public static <T, Receiver2ThrowableType extends Throwable> Output<T, Receiver2ThrowableType> lock( final Lock lock,
+ final Output<T, Receiver2ThrowableType> output
+ )
+ {
+ return new Output<T, Receiver2ThrowableType>()
+ {
+ @Override
+ public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends T, SenderThrowableType> sender )
+ throws Receiver2ThrowableType, SenderThrowableType
+ {
+ /**
+ * Fix for this bug:
+ * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6822370
+ */
+ while( true )
+ {
+ try
+ {
+ //noinspection StatementWithEmptyBody
+ while( !lock.tryLock( 1000, TimeUnit.MILLISECONDS ) )
+ {
+ // On timeout, try again
+ }
+ break; // Finally got a lock
+ }
+ catch( InterruptedException e )
+ {
+ // Try again
+ }
+ }
+
+ try
+ {
+ output.receiveFrom( sender );
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+ };
+ }
+
+ /**
+ * Wrapper for Outputs that uses a lock whenever a transfer is instantiated. Typically a read-lock would be used on the sending side and a write-lock
+ * would be used on the receiving side.
+ *
+ * @param lock the lock to be used for transfers
+ * @param input input to be wrapped
+ * @param <T> The item type.
+ * @param <SenderThrowableType> The Exception type that the Sender might throw.
+ *
+ * @return Input wrapper that uses the given lock during transfers.
+ */
+ public static <T, SenderThrowableType extends Throwable> Input<T, SenderThrowableType> lock( final Lock lock,
+ final Input<T, SenderThrowableType> input
+ )
+ {
+ return new Input<T, SenderThrowableType>()
+ {
+ @Override
+ public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super T, ReceiverThrowableType> output )
+ throws SenderThrowableType, ReceiverThrowableType
+ {
+ /**
+ * Fix for this bug:
+ * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6822370
+ */
+ while( true )
+ {
+ try
+ {
+ //noinspection StatementWithEmptyBody
+ while( !( lock.tryLock() || lock.tryLock( 1000, TimeUnit.MILLISECONDS ) ) )
+ {
+ // On timeout, try again
+ }
+ break; // Finally got a lock
+ }
+ catch( InterruptedException e )
+ {
+ // Try again
+ }
+ }
+
+ try
+ {
+ input.transferTo( output );
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+ };
+ }
+
+ /**
+ * Count the number of items in the transfer.
+ *
+ * @param <T>
+ */
+ // START SNIPPET: counter
+ public static class Counter<T>
+ implements Function<T, T>
+ {
+ private volatile long count = 0;
+
+ public long count()
+ {
+ return count;
+ }
+
+ @Override
+ public T map( T t )
+ {
+ count++;
+ return t;
+ }
+ }
+ // END SNIPPET: counter
+
+ /**
+ * Convert strings to bytes using the given CharSet
+ */
+ @SuppressWarnings( "UnusedDeclaration" )
+ public static class String2Bytes
+ implements Function<String, byte[]>
+ {
+ private Charset charSet;
+
+ public String2Bytes( Charset charSet )
+ {
+ this.charSet = charSet;
+ }
+
+ @Override
+ public byte[] map( String s )
+ {
+ return s.getBytes( charSet );
+ }
+ }
+
+ /**
+ * Convert ByteBuffers to Strings using the given CharSet
+ */
+ public static class ByteBuffer2String
+ implements Function<ByteBuffer, String>
+ {
+ private Charset charSet;
+
+ public ByteBuffer2String( Charset charSet )
+ {
+ this.charSet = charSet;
+ }
+
+ @Override
+ public String map( ByteBuffer buffer )
+ {
+ return new String( buffer.array(), charSet );
+ }
+ }
+
+ /**
+ * Convert objects to Strings using .toString()
+ */
+ @SuppressWarnings( "UnusedDeclaration" )
+ public static class ObjectToString
+ implements Function<Object, String>
+ {
+ @Override
+ public String map( Object o )
+ {
+ return o.toString();
+ }
+ }
+
+ /**
+ * Log the toString() representation of transferred items to the given log. The string is first formatted using MessageFormat
+ * with the given format.
+ *
+ * @param <T>
+ */
+ public static class Log<T>
+ implements Function<T, T>
+ {
+ private Logger logger;
+ private MessageFormat format;
+
+ public Log( Logger logger, String format )
+ {
+ this.logger = logger;
+ this.format = new MessageFormat( format );
+ }
+
+ @Override
+ public T map( T item )
+ {
+ logger.info( format.format( new String[]{ item.toString() } ) );
+ return item;
+ }
+ }
+
+ /**
+ * Track progress of transfer by emitting a log message in given intervals.
+ *
+ * If logger or format is null, then you need to override the logProgress to do something
+ *
+ * @param <T> type of items to be transferred
+ */
+ // START SNIPPET: progress
+ public static class ProgressLog<T>
+ implements Function<T, T>
+ {
+ private Counter<T> counter;
+ private Log<String> log;
+ private final long interval;
+
+ public ProgressLog( Logger logger, String format, long interval )
+ {
+ this.interval = interval;
+ if( logger != null && format != null )
+ {
+ log = new Log<>( logger, format );
+ }
+ counter = new Counter<>();
+ }
+
+ public ProgressLog( long interval )
+ {
+ this.interval = interval;
+ counter = new Counter<>();
+ }
+
+ @Override
+ public T map( T t )
+ {
+ counter.map( t );
+ if( counter.count % interval == 0 )
+ {
+ logProgress();
+ }
+ return t;
+ }
+
+ // Override this to do something other than logging the progress
+ protected void logProgress()
+ {
+ if( log != null )
+ {
+ log.map( counter.count + "" );
+ }
+ }
+ }
+ // END SNIPPET: progress
+}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/a789141d/core/io/src/main/java/org/qi4j/io/package.html
----------------------------------------------------------------------
diff --git a/core/io/src/main/java/org/qi4j/io/package.html b/core/io/src/main/java/org/qi4j/io/package.html
new file mode 100644
index 0000000..aac8a54
--- /dev/null
+++ b/core/io/src/main/java/org/qi4j/io/package.html
@@ -0,0 +1,21 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<html>
+ <body>
+ <h2>I/O API.</h2>
+ </body>
+</html>
http://git-wip-us.apache.org/repos/asf/zest-java/blob/a789141d/core/io/src/test/java/org/apache/zest/io/InputOutputTest.java
----------------------------------------------------------------------
diff --git a/core/io/src/test/java/org/apache/zest/io/InputOutputTest.java b/core/io/src/test/java/org/apache/zest/io/InputOutputTest.java
deleted file mode 100644
index bc40f2e..0000000
--- a/core/io/src/test/java/org/apache/zest/io/InputOutputTest.java
+++ /dev/null
@@ -1,381 +0,0 @@
-/*
- * Copyright (c) 2010, Rickard Öberg. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.zest.io;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.io.Writer;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.rmi.RemoteException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.logging.Logger;
-import org.hamcrest.CoreMatchers;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.zest.functional.Function;
-import org.apache.zest.functional.Visitor;
-
-import static java.util.Arrays.asList;
-import static org.apache.zest.functional.Iterables.iterable;
-import static org.apache.zest.io.Inputs.text;
-import static org.apache.zest.io.Transforms.lock;
-import static org.apache.zest.test.util.Assume.assumeConnectivity;
-
-/**
- * Test Input/Output.
- */
-public class InputOutputTest
-{
- @Test
- public void testCopyFileNoAPI()
- throws IOException
- {
- File source = sourceFile();
- File destination = File.createTempFile( "test", ".txt" );
- destination.deleteOnExit();
-
- BufferedReader reader = new BufferedReader( new FileReader( source ) );
- long count = 0;
- try
- {
- BufferedWriter writer = new BufferedWriter( new FileWriter( destination ) );
- try
- {
- String line;
- while( ( line = reader.readLine() ) != null )
- {
- count++;
- writer.append( line ).append( '\n' );
- }
- writer.close();
- }
- catch( IOException e )
- {
- writer.close();
- destination.delete();
- }
- }
- finally
- {
- reader.close();
- }
- System.out.println( count );
- }
-
- @Test
- public void testInputOutput()
- throws IOException
- {
- URL source = getClass().getResource( "/iotest.txt" );
- File destination = File.createTempFile( "test", ".txt" );
- destination.deleteOnExit();
- text( source ).transferTo( Outputs.text( destination ) );
- }
-
- @Test
- public void testCopyFile()
- throws IOException
- {
- File source = sourceFile();
- File tempFile = File.createTempFile( "test", ".txt" );
- tempFile.deleteOnExit();
-
- Inputs.byteBuffer( source, 1024 ).transferTo( Outputs.byteBuffer( tempFile ) );
-
- Assert.assertThat( tempFile.length(), CoreMatchers.equalTo( source.length() ) );
- }
-
- @Test
- public void testCopyURL()
- throws IOException
- {
- assumeConnectivity( "www.google.com", 80 );
-
- File tempFile = File.createTempFile( "test", ".txt" );
- tempFile.deleteOnExit();
-
- Inputs.text( new URL( "http://www.google.com" ) ).transferTo( Outputs.text( tempFile ) );
-
-// Uncomment to check output Inputs.text( tempFile ).transferTo( Outputs.systemOut() );
- }
-
- @Test
- public void testCopyFileStreams()
- throws IOException
- {
- File source = sourceFile();
- File tempFile = File.createTempFile( "test", ".txt" );
- tempFile.deleteOnExit();
-
- Inputs.byteBuffer( new FileInputStream( source ), 1024 ).transferTo(
- Outputs.byteBuffer( new FileOutputStream( tempFile ) ) );
-
- Assert.assertThat( tempFile.length(), CoreMatchers.equalTo( source.length() ) );
- }
-
- @Test
- public void testLog()
- throws IOException
- {
- File source = sourceFile();
-
- text( source ).transferTo(
- Transforms.map( new Transforms.Log<String>( Logger.getLogger( getClass().getName() ), "Line: {0}" ),
- Outputs.<String>noop() ) );
- }
-
- @Test
- public void testProgressLog()
- throws Throwable
- {
- Integer[] data = new Integer[ 105 ];
- Arrays.fill( data, 42 );
-
- Inputs.iterable( iterable( data ) ).transferTo(
- Transforms.map(
- new Transforms.ProgressLog<Integer>(
- Logger.getLogger( InputOutputTest.class.getName() ), "Data transferred: {0}", 10 ),
- Outputs.<Integer>noop() ) );
- }
-
- @Test
- public void testTextInputsOutputs()
- throws IOException
- {
- File tempFile = File.createTempFile( "test", ".txt" );
- tempFile.deleteOnExit();
- File sourceFile = sourceFile();
- Transforms.Counter<String> stringCounter = new Transforms.Counter<>();
- text( sourceFile ).transferTo(
- Transforms.map(
- stringCounter,
- Transforms.map( new Function<String, String>()
- {
- public String map( String s )
- {
- System.out.println( s );
- return s;
- }
- }, Outputs.text( tempFile ) )
- )
- );
-
- Assert.assertThat( tempFile.length(), CoreMatchers.equalTo( sourceFile.length() ) );
- Assert.assertThat( stringCounter.count(), CoreMatchers.equalTo( 4L ) );
- }
-
- @Test
- public void testCombineInputs()
- throws IOException
- {
- File tempFile = File.createTempFile( "test", ".txt" );
- tempFile.deleteOnExit();
- File sourceFile = sourceFile();
- Transforms.Counter<String> stringCounter = new Transforms.Counter<>();
- Input<String, IOException> text1 = text( sourceFile );
- Input<String, IOException> text2 = text( sourceFile );
- List<Input<String, IOException>> list = createList( text1, text2 );
- Inputs.combine( list ).transferTo(
- Transforms.map(
- stringCounter,
- Transforms.map( new Function<String, String>()
- {
- public String map( String s )
- {
- System.out.println( s );
- return s;
- }
- }, Outputs.text( tempFile ) )
- )
- );
-
- Assert.assertThat( tempFile.length(), CoreMatchers.equalTo( sourceFile.length() * 2 ) );
- Assert.assertThat( stringCounter.count(), CoreMatchers.equalTo( 8L ) );
- }
-
- @SuppressWarnings( "unchecked" )
- private List<Input<String, IOException>> createList( Input<String, IOException> text1,
- Input<String, IOException> text2
- )
- {
- return asList( text1, text2 );
- }
-
- @Test( expected = IOException.class )
- public void testInputOutputOutputException()
- throws IOException
- {
-
- text( sourceFile() ).
- transferTo( writerOutput( new Writer()
- {
- @Override
- public void write( char[] cbuf, int off, int len )
- throws IOException
- {
- throw new IOException();
- }
-
- @Override
- public void flush()
- throws IOException
- {
- throw new IOException();
- }
-
- @Override
- public void close()
- throws IOException
- {
- throw new IOException();
- }
- } ) );
- }
-
- @Test( expected = RemoteException.class )
- public void testInputOutputInputException()
- throws IOException
- {
-
- Input<String, RemoteException> input = new Input<String, RemoteException>()
- {
- @Override
- public <OutputThrowableType extends Throwable> void transferTo( Output<? super String, OutputThrowableType> output )
- throws RemoteException, OutputThrowableType
- {
- output.receiveFrom( new Sender<String, RemoteException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super String, ReceiverThrowableType> receiverThrowableTypeReceiver )
- throws ReceiverThrowableType, RemoteException
- {
- throw new RemoteException();
- }
- } );
- }
- };
-
- input.transferTo(
- Transforms.map(
- new Transforms.Log<String>( Logger.getLogger( getClass().getName() ), "Line: {0}" ),
- Outputs.systemOut()
- )
- );
- }
-
- @Test
- public void testLock()
- throws IOException
- {
- Lock inputLock = new ReentrantLock();
- Lock outputLock = new ReentrantLock();
-
- URL source = getClass().getResource( "/iotest.txt" );
- File destination = File.createTempFile( "test", ".txt" );
- destination.deleteOnExit();
- lock( inputLock, text( source ) ).transferTo( lock( outputLock, Outputs.text( destination ) ) );
- }
-
- @Test
- public void testGenerics()
- {
- ArrayList<Object> objects = new ArrayList<>( 3 );
- Inputs.iterable( Arrays.asList( "Foo", "Bar", "Xyzzy" ) ).transferTo( Outputs.collection( objects ) );
-
- Inputs.iterable( objects ).transferTo( Outputs.systemOut() );
- }
-
- @Test
- public void testOutputstreamInput()
- throws Throwable
- {
- Input<ByteBuffer, IOException> input = Inputs.output( new Visitor<OutputStream, IOException>()
- {
- @Override
- public boolean visit( OutputStream visited )
- throws IOException
- {
- try( PrintWriter writer = new PrintWriter( visited ) )
- {
- writer.print( "Hello World!" );
- }
- return true;
- }
- }, 256 );
-
- input.transferTo( Transforms.map( new Transforms.ByteBuffer2String( Charset.defaultCharset() ), Outputs.systemOut() ) );
- input.transferTo( Transforms.map( new Transforms.ByteBuffer2String( Charset.defaultCharset() ), Outputs.systemOut() ) );
- }
-
- public Output<String, IOException> writerOutput( final Writer writer )
- {
- return new Output<String, IOException>()
- {
- @Override
- public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender )
- throws IOException, SenderThrowableType
- {
- // Here we initiate the transfer
- System.out.println( "Open output" );
- final StringBuilder builder = new StringBuilder();
- try
- {
- sender.sendTo( new Receiver<String, IOException>()
- {
- @Override
- public void receive( String item )
- throws IOException
- {
- System.out.println( "Receive input" );
-
- // Here we can do batch writes if needed
- builder.append( item ).append( "\n" );
- }
- } );
-
- // If transfer went well, do something with it
- writer.write( builder.toString() );
- writer.flush();
- System.out.println( "Output written" );
- }
- catch( IOException e )
- {
- // If transfer failed, potentially rollback writes
- System.out.println( "Input failed" );
- throw e;
- }
- }
- };
- }
-
- private File sourceFile()
- {
- String path = getClass().getResource( "/iotest.txt" ).getFile();
- return new File( path.replaceAll( "%20", " " ) );
- }
-}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/a789141d/core/io/src/test/java/org/apache/zest/io/docsupport/IoDocs.java
----------------------------------------------------------------------
diff --git a/core/io/src/test/java/org/apache/zest/io/docsupport/IoDocs.java b/core/io/src/test/java/org/apache/zest/io/docsupport/IoDocs.java
deleted file mode 100644
index 9ff74a2..0000000
--- a/core/io/src/test/java/org/apache/zest/io/docsupport/IoDocs.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.zest.io.docsupport;
-
-import java.io.File;
-import java.io.IOException;
-import org.apache.zest.io.Inputs;
-import org.apache.zest.io.Outputs;
-
-// START SNIPPET: io2
-import org.apache.zest.io.Transforms.Counter;
-import static org.apache.zest.io.Transforms.map;
-// END SNIPPET: io2
-
-public class IoDocs
-{
- public static void main( String[] args )
- throws IOException
- {
- {
-// START SNIPPET: io1
- File source = new File( "source.txt" );
- File destination = new File( "destination.txt" );
- Inputs.text( source ).transferTo( Outputs.text( destination ) );
-// END SNIPPET: io1
- }
- {
-// START SNIPPET: io2
- File source = new File( "source.txt" );
- File destination = new File( "destination.txt" );
- Counter<String> counter = new Counter<String>();
- Inputs.text( source ).transferTo( map(counter, Outputs.text(destination) ));
- System.out.println( "Lines: " + counter.count() );
-// END SNIPPET: io2
- }
- }
-}