You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@polygene.apache.org by pa...@apache.org on 2016/12/03 11:20:10 UTC

[1/2] zest-java git commit: entitystores: replace usage of core/io with streams

Repository: zest-java
Updated Branches:
  refs/heads/replace-io-by-streams [created] eb4e31a97


http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-sql/src/main/java/org/apache/zest/entitystore/sql/SQLEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-sql/src/main/java/org/apache/zest/entitystore/sql/SQLEntityStoreMixin.java b/extensions/entitystore-sql/src/main/java/org/apache/zest/entitystore/sql/SQLEntityStoreMixin.java
index a52b526..fa66da2 100644
--- a/extensions/entitystore-sql/src/main/java/org/apache/zest/entitystore/sql/SQLEntityStoreMixin.java
+++ b/extensions/entitystore-sql/src/main/java/org/apache/zest/entitystore/sql/SQLEntityStoreMixin.java
@@ -34,8 +34,13 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Spliterator;
+import java.util.Spliterators;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 import org.apache.zest.api.common.Optional;
 import org.apache.zest.api.common.QualifiedName;
 import org.apache.zest.api.entity.EntityDescriptor;
@@ -57,11 +62,6 @@ import org.apache.zest.entitystore.sql.internal.DatabaseSQLService;
 import org.apache.zest.entitystore.sql.internal.DatabaseSQLService.EntityValueResult;
 import org.apache.zest.entitystore.sql.internal.SQLEntityState;
 import org.apache.zest.entitystore.sql.internal.SQLEntityState.DefaultSQLEntityState;
-import org.apache.zest.functional.Visitor;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.library.sql.common.SQLUtil;
 import org.apache.zest.spi.ZestSPI;
 import org.apache.zest.spi.entity.EntityState;
@@ -272,7 +272,8 @@ public class SQLEntityStoreMixin
                                        EntityDescriptor entityDescriptor
     )
     {
-        return new DefaultSQLEntityState( new DefaultEntityState( unitOfWork.currentTime(), entityRef, entityDescriptor ) );
+        return new DefaultSQLEntityState(
+            new DefaultEntityState( unitOfWork.currentTime(), entityRef, entityDescriptor ) );
     }
 
     @Override
@@ -282,77 +283,56 @@ public class SQLEntityStoreMixin
     }
 
     @Override
-    public Input<EntityState, EntityStoreException> entityStates( final ModuleDescriptor module )
+    public Stream<EntityState> entityStates( final ModuleDescriptor module )
     {
-        return new Input<EntityState, EntityStoreException>()
+        try
         {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super EntityState, ReceiverThrowableType> output )
-                throws EntityStoreException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<EntityState, EntityStoreException>()
+            Connection connection = database.getConnection();
+            PreparedStatement ps = database.prepareGetAllEntitiesStatement( connection );
+            database.populateGetAllEntitiesStatement( ps );
+            ResultSet rs = ps.executeQuery();
+            return StreamSupport.stream(
+                new Spliterators.AbstractSpliterator<EntityState>( Long.MAX_VALUE, Spliterator.ORDERED )
                 {
                     @Override
-                    public <RecThrowableType extends Throwable> void sendTo( final Receiver<? super EntityState, RecThrowableType> receiver )
-                        throws RecThrowableType, EntityStoreException
+                    public boolean tryAdvance( final Consumer<? super EntityState> action )
                     {
-                        queryAllEntities( module, visited -> {
-                            try
-                            {
-                                receiver.receive( visited );
-                            }
-                            catch( Throwable receiverThrowableType )
-                            {
-                                throw new SQLException( receiverThrowableType );
-                            }
+                        try
+                        {
+                            if( !rs.next() ) { return false; }
+                            EntityState entityState = readEntityState( module,
+                                                                       database.getEntityValue( rs ).getReader() );
+                            action.accept( entityState );
                             return true;
-                        } );
+                        }
+                        catch( SQLException ex )
+                        {
+                            SQLUtil.closeQuietly( rs, ex );
+                            SQLUtil.closeQuietly( ps, ex );
+                            SQLUtil.closeQuietly( connection, ex );
+                            throw new EntityStoreException( ex );
+                        }
                     }
-                } );
-            }
-        };
-    }
-
-    private void queryAllEntities( ModuleDescriptor module, EntityStatesVisitor entityStatesVisitor )
-    {
-        Connection connection = null;
-        PreparedStatement ps = null;
-        ResultSet rs = null;
-        try
-        {
-            connection = database.getConnection();
-            ps = database.prepareGetAllEntitiesStatement( connection );
-            database.populateGetAllEntitiesStatement( ps );
-            rs = ps.executeQuery();
-            while( rs.next() )
-            {
-                DefaultEntityState entityState = readEntityState( module, database.getEntityValue( rs ).getReader() );
-                if( !entityStatesVisitor.visit( entityState ) )
+                },
+                false
+            ).onClose(
+                () ->
                 {
-                    return;
+                    SQLUtil.closeQuietly( rs );
+                    SQLUtil.closeQuietly( ps );
+                    SQLUtil.closeQuietly( connection );
                 }
-            }
+            );
         }
         catch( SQLException ex )
         {
             throw new EntityStoreException( ex );
         }
-        finally
-        {
-            SQLUtil.closeQuietly( rs );
-            SQLUtil.closeQuietly( ps );
-            SQLUtil.closeQuietly( connection );
-        }
-    }
-
-    private interface EntityStatesVisitor
-        extends Visitor<EntityState, SQLException>
-    {
     }
 
     protected Identity newUnitOfWorkId()
     {
-        return identityGenerator.generate(EntityStore.class);
+        return identityGenerator.generate( EntityStore.class );
     }
 
     protected DefaultEntityState readEntityState( ModuleDescriptor module, Reader entityState )
@@ -364,7 +344,7 @@ public class SQLEntityStoreMixin
             final EntityStatus[] status = { EntityStatus.LOADED };
 
             String version = jsonObject.getString( JSONKeys.VERSION );
-            Instant modified = Instant.ofEpochMilli(jsonObject.getLong( JSONKeys.MODIFIED ));
+            Instant modified = Instant.ofEpochMilli( jsonObject.getLong( JSONKeys.MODIFIED ) );
             String identity = jsonObject.getString( JSONKeys.IDENTITY );
 
             // Check if version is correct
@@ -398,102 +378,117 @@ public class SQLEntityStoreMixin
 
             Map<QualifiedName, Object> properties = new HashMap<>();
             JSONObject props = jsonObject.getJSONObject( JSONKeys.PROPERTIES );
-            entityDescriptor.state().properties().forEach( propertyDescriptor -> {
-                Object jsonValue;
-                try
+            entityDescriptor.state().properties().forEach(
+                propertyDescriptor ->
                 {
-                    jsonValue = props.get( propertyDescriptor.qualifiedName().name() );
-                    if( JSONObject.NULL.equals( jsonValue ) )
+                    Object jsonValue;
+                    try
                     {
-                        properties.put( propertyDescriptor.qualifiedName(), null );
+                        jsonValue = props.get(
+                            propertyDescriptor.qualifiedName().name() );
+                        if( JSONObject.NULL.equals( jsonValue ) )
+                        {
+                            properties.put( propertyDescriptor.qualifiedName(), null );
+                        }
+                        else
+                        {
+                            Object value = valueSerialization.deserialize( module,
+                                                                           propertyDescriptor.valueType(),
+                                                                           jsonValue.toString() );
+                            properties.put( propertyDescriptor.qualifiedName(), value );
+                        }
                     }
-                    else
+                    catch( JSONException e )
                     {
-                        Object value = valueSerialization.deserialize( module, propertyDescriptor.valueType(), jsonValue
-                            .toString() );
-                        properties.put( propertyDescriptor.qualifiedName(), value );
+                        // Value not found, default it
+                        Object initialValue = propertyDescriptor.initialValue( module );
+                        properties.put( propertyDescriptor.qualifiedName(), initialValue );
+                        status[ 0 ] = EntityStatus.UPDATED;
                     }
                 }
-                catch( JSONException e )
-                {
-                    // Value not found, default it
-                    Object initialValue = propertyDescriptor.initialValue( module );
-                    properties.put( propertyDescriptor.qualifiedName(), initialValue );
-                    status[ 0 ] = EntityStatus.UPDATED;
-                }
-            } );
+            );
 
             Map<QualifiedName, EntityReference> associations = new HashMap<>();
             JSONObject assocs = jsonObject.getJSONObject( JSONKeys.ASSOCIATIONS );
-            entityDescriptor.state().associations().forEach( associationType -> {
-                try
+            entityDescriptor.state().associations().forEach(
+                associationType ->
                 {
-                    Object jsonValue = assocs.get( associationType.qualifiedName().name() );
-                    EntityReference value = jsonValue == JSONObject.NULL ? null : EntityReference.parseEntityReference(
-                        (String) jsonValue );
-                    associations.put( associationType.qualifiedName(), value );
-                }
-                catch( JSONException e )
-                {
-                    // Association not found, default it to null
-                    associations.put( associationType.qualifiedName(), null );
-                    status[ 0 ] = EntityStatus.UPDATED;
+                    try
+                    {
+                        Object jsonValue = assocs.get( associationType.qualifiedName().name() );
+                        EntityReference value = jsonValue == JSONObject.NULL
+                                                ? null
+                                                : EntityReference.parseEntityReference( (String) jsonValue );
+                        associations.put( associationType.qualifiedName(), value );
+                    }
+                    catch( JSONException e )
+                    {
+                        // Association not found, default it to null
+                        associations.put( associationType.qualifiedName(), null );
+                        status[ 0 ] = EntityStatus.UPDATED;
+                    }
                 }
-            } );
+            );
 
             JSONObject manyAssocs = jsonObject.getJSONObject( JSONKeys.MANY_ASSOCIATIONS );
             Map<QualifiedName, List<EntityReference>> manyAssociations = new HashMap<>();
-            entityDescriptor.state().manyAssociations().forEach( manyAssociationType -> {
-                List<EntityReference> references = new ArrayList<>();
-                try
+            entityDescriptor.state().manyAssociations().forEach(
+                manyAssociationType ->
                 {
-                    JSONArray jsonValues = manyAssocs.getJSONArray( manyAssociationType.qualifiedName().name() );
-                    for( int i = 0; i < jsonValues.length(); i++ )
+                    List<EntityReference> references = new ArrayList<>();
+                    try
+                    {
+                        JSONArray jsonValues = manyAssocs.getJSONArray( manyAssociationType.qualifiedName().name() );
+                        for( int i = 0; i < jsonValues.length(); i++ )
+                        {
+                            Object jsonValue = jsonValues.getString( i );
+                            EntityReference value = jsonValue == JSONObject.NULL
+                                                    ? null
+                                                    : EntityReference.parseEntityReference( (String) jsonValue );
+                            references.add( value );
+                        }
+                        manyAssociations.put( manyAssociationType.qualifiedName(), references );
+                    }
+                    catch( JSONException e )
                     {
-                        Object jsonValue = jsonValues.getString( i );
-                        EntityReference value = jsonValue == JSONObject.NULL ? null : EntityReference.parseEntityReference(
-                            (String) jsonValue );
-                        references.add( value );
+                        // ManyAssociation not found, default to empty one
+                        manyAssociations.put( manyAssociationType.qualifiedName(), references );
                     }
-                    manyAssociations.put( manyAssociationType.qualifiedName(), references );
-                }
-                catch( JSONException e )
-                {
-                    // ManyAssociation not found, default to empty one
-                    manyAssociations.put( manyAssociationType.qualifiedName(), references );
-                }
-            } );
+                } );
 
             JSONObject namedAssocs = jsonObject.has( JSONKeys.NAMED_ASSOCIATIONS )
                                      ? jsonObject.getJSONObject( JSONKeys.NAMED_ASSOCIATIONS )
                                      : new JSONObject();
             Map<QualifiedName, Map<String, EntityReference>> namedAssociations = new HashMap<>();
-            entityDescriptor.state().namedAssociations().forEach( namedAssociationType -> {
-                Map<String, EntityReference> references = new LinkedHashMap<>();
-                try
+            entityDescriptor.state().namedAssociations().forEach(
+                namedAssociationType ->
                 {
-                    JSONObject jsonValues = namedAssocs.getJSONObject( namedAssociationType.qualifiedName().name() );
-                    JSONArray names = jsonValues.names();
-                    if( names != null )
+                    Map<String, EntityReference> references = new LinkedHashMap<>();
+                    try
                     {
-                        for( int idx = 0; idx < names.length(); idx++ )
+                        JSONObject jsonValues = namedAssocs.getJSONObject( namedAssociationType.qualifiedName().name() );
+                        JSONArray names = jsonValues.names();
+                        if( names != null )
                         {
-                            String name = names.getString( idx );
-                            String jsonValue = jsonValues.getString( name );
-                            references.put( name, EntityReference.parseEntityReference( jsonValue ) );
+                            for( int idx = 0; idx < names.length(); idx++ )
+                            {
+                                String name = names.getString( idx );
+                                String jsonValue = jsonValues.getString( name );
+                                references.put( name, EntityReference.parseEntityReference( jsonValue ) );
+                            }
                         }
+                        namedAssociations.put( namedAssociationType.qualifiedName(), references );
                     }
-                    namedAssociations.put( namedAssociationType.qualifiedName(), references );
-                }
-                catch( JSONException e )
-                {
-                    // NamedAssociation not found, default to empty one
-                    namedAssociations.put( namedAssociationType.qualifiedName(), references );
-                }
-            } );
+                    catch( JSONException e )
+                    {
+                        // NamedAssociation not found, default to empty one
+                        namedAssociations.put( namedAssociationType.qualifiedName(), references );
+                    }
+                } );
 
             return new DefaultEntityState( version, modified,
-                                           EntityReference.parseEntityReference( identity ), status[ 0 ], entityDescriptor,
+                                           EntityReference.parseEntityReference( identity ), status[ 0 ],
+                                           entityDescriptor,
                                            properties, associations, manyAssociations, namedAssociations );
         }
         catch( JSONException e )
@@ -507,7 +502,7 @@ public class SQLEntityStoreMixin
         throws IOException
     {
         JSONObject jsonObject;
-        try (Reader reader = getValue( EntityReference.parseEntityReference( id ) ).getReader())
+        try( Reader reader = getValue( EntityReference.parseEntityReference( id ) ).getReader() )
         {
             jsonObject = new JSONObject( new JSONTokener( reader ) );
         }
@@ -553,58 +548,65 @@ public class SQLEntityStoreMixin
         try
         {
             JSONWriter json = new JSONWriter( writer );
-            JSONWriter properties = json.object().
-                key( JSONKeys.IDENTITY ).value( state.entityReference().identity().toString() ).
-                key( JSONKeys.APPLICATION_VERSION ).value( application.version() ).
-                key( JSONKeys.TYPE ).value( state.entityDescriptor().types().findFirst().get().getName() ).
-                key( JSONKeys.VERSION ).value( version ).
-                key( JSONKeys.MODIFIED ).value( state.lastModified().toEpochMilli() ).
-                key( JSONKeys.PROPERTIES ).object();
-
-            state.entityDescriptor().state().properties().forEach( persistentProperty -> {
-                try
+            JSONWriter properties = json.object()
+                                        .key( JSONKeys.IDENTITY )
+                                        .value( state.entityReference().identity().toString() )
+                                        .key( JSONKeys.APPLICATION_VERSION )
+                                        .value( application.version() )
+                                        .key( JSONKeys.TYPE )
+                                        .value( state.entityDescriptor().types().findFirst().get().getName() )
+                                        .key( JSONKeys.VERSION )
+                                        .value( version )
+                                        .key( JSONKeys.MODIFIED )
+                                        .value( state.lastModified().toEpochMilli() )
+                                        .key( JSONKeys.PROPERTIES )
+                                        .object();
+
+            state.entityDescriptor().state().properties().forEach(
+                persistentProperty ->
                 {
-                    Object value = state.properties().get( persistentProperty.qualifiedName() );
-                    json.key( persistentProperty.qualifiedName().name() );
-                    if( value == null || ValueType.isPrimitiveValue( value ) )
+                    try
                     {
-                        json.value( value );
-                    }
-                    else
-                    {
-                        String serialized = valueSerialization.serialize( value );
-                        if( serialized.startsWith( "{" ) )
+                        Object value = state.properties().get( persistentProperty.qualifiedName() );
+                        json.key( persistentProperty.qualifiedName().name() );
+                        if( value == null || ValueType.isPrimitiveValue( value ) )
                         {
-                            json.value( new JSONObject( serialized ) );
-                        }
-                        else if( serialized.startsWith( "[" ) )
-                        {
-                            json.value( new JSONArray( serialized ) );
+                            json.value( value );
                         }
                         else
                         {
-                            json.value( serialized );
+                            String serialized = valueSerialization.serialize( value );
+                            if( serialized.startsWith( "{" ) )
+                            {
+                                json.value( new JSONObject( serialized ) );
+                            }
+                            else if( serialized.startsWith( "[" ) )
+                            {
+                                json.value( new JSONArray( serialized ) );
+                            }
+                            else
+                            {
+                                json.value( serialized );
+                            }
                         }
                     }
-                }
-                catch( JSONException e )
-                {
-                    throw new EntityStoreException( "Could not store EntityState", e );
-                }
-            } );
+                    catch( JSONException e )
+                    {
+                        throw new EntityStoreException(
+                            "Could not store EntityState", e );
+                    }
+                } );
 
             JSONWriter associations = properties.endObject().key( JSONKeys.ASSOCIATIONS ).object();
-            for( Map.Entry<QualifiedName, EntityReference> stateNameEntityReferenceEntry : state.associations()
-                .entrySet() )
+            for( Map.Entry<QualifiedName, EntityReference> stateNameEntityRefEntry : state.associations().entrySet() )
             {
-                EntityReference value = stateNameEntityReferenceEntry.getValue();
-                associations.key( stateNameEntityReferenceEntry.getKey().name() ).
-                    value( value != null ? value.identity().toString() : null );
+                EntityReference value = stateNameEntityRefEntry.getValue();
+                associations.key( stateNameEntityRefEntry.getKey().name() )
+                            .value( value != null ? value.identity().toString() : null );
             }
 
             JSONWriter manyAssociations = associations.endObject().key( JSONKeys.MANY_ASSOCIATIONS ).object();
-            for( Map.Entry<QualifiedName, List<EntityReference>> stateNameListEntry : state.manyAssociations()
-                .entrySet() )
+            for( Map.Entry<QualifiedName, List<EntityReference>> stateNameListEntry : state.manyAssociations().entrySet() )
             {
                 JSONWriter assocs = manyAssociations.key( stateNameListEntry.getKey().name() ).array();
                 for( EntityReference entityReference : stateNameListEntry.getValue() )
@@ -615,8 +617,7 @@ public class SQLEntityStoreMixin
             }
 
             JSONWriter namedAssociations = manyAssociations.endObject().key( JSONKeys.NAMED_ASSOCIATIONS ).object();
-            for( Map.Entry<QualifiedName, Map<String, EntityReference>> stateNameMapEntry : state.namedAssociations()
-                .entrySet() )
+            for( Map.Entry<QualifiedName, Map<String, EntityReference>> stateNameMapEntry : state.namedAssociations().entrySet() )
             {
                 JSONWriter assocs = namedAssociations.key( stateNameMapEntry.getKey().name() ).object();
                 for( Map.Entry<String, EntityReference> entry : stateNameMapEntry.getValue().entrySet() )

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/migration/src/test/java/org/apache/zest/migration/MigrationTest.java
----------------------------------------------------------------------
diff --git a/extensions/migration/src/test/java/org/apache/zest/migration/MigrationTest.java b/extensions/migration/src/test/java/org/apache/zest/migration/MigrationTest.java
index 1da1f7b..7f8cb54 100644
--- a/extensions/migration/src/test/java/org/apache/zest/migration/MigrationTest.java
+++ b/extensions/migration/src/test/java/org/apache/zest/migration/MigrationTest.java
@@ -19,26 +19,18 @@
  */
 package org.apache.zest.migration;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.StringReader;
-import org.apache.zest.api.identity.Identity;
-import org.apache.zest.bootstrap.unitofwork.DefaultUnitOfWorkAssembler;
-import org.hamcrest.CoreMatchers;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.junit.Test;
+import java.util.List;
+import java.util.stream.Stream;
 import org.apache.zest.api.activation.ActivationException;
+import org.apache.zest.api.identity.Identity;
 import org.apache.zest.api.service.importer.NewObjectImporter;
 import org.apache.zest.api.unitofwork.UnitOfWork;
 import org.apache.zest.api.unitofwork.UnitOfWorkCompletionException;
 import org.apache.zest.bootstrap.AssemblyException;
 import org.apache.zest.bootstrap.ModuleAssembly;
 import org.apache.zest.bootstrap.SingletonAssembler;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
+import org.apache.zest.bootstrap.unitofwork.DefaultUnitOfWorkAssembler;
 import org.apache.zest.migration.assembly.EntityMigrationOperation;
 import org.apache.zest.migration.assembly.MigrationBuilder;
 import org.apache.zest.migration.assembly.MigrationOperation;
@@ -47,7 +39,12 @@ import org.apache.zest.spi.entitystore.helpers.JSONKeys;
 import org.apache.zest.spi.entitystore.helpers.StateStore;
 import org.apache.zest.test.AbstractZestTest;
 import org.apache.zest.test.EntityTestAssembler;
+import org.hamcrest.CoreMatchers;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.junit.Test;
 
+import static java.util.stream.Collectors.toList;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -106,7 +103,7 @@ public class MigrationTest
     {
         Identity id;
         // Set up version 1
-        StringInputOutput data_v1 = new StringInputOutput();
+        List<String> data_v1;
         {
             SingletonAssembler v1 = new SingletonAssembler()
             {
@@ -130,11 +127,14 @@ public class MigrationTest
             BackupRestore backupRestore = v1.module()
                 .findService( BackupRestore.class )
                 .get();
-            backupRestore.backup().transferTo( data_v1 );
+            try( Stream<String> backup = backupRestore.backup() )
+            {
+                data_v1 = backup.collect( toList() );
+            }
         }
 
         // Set up version 1.1
-        StringInputOutput data_v1_1 = new StringInputOutput();
+        List<String> data_v1_1;
         {
             SingletonAssembler v1_1 = new SingletonAssembler()
             {
@@ -148,7 +148,7 @@ public class MigrationTest
             };
 
             BackupRestore testData = v1_1.module().findService( BackupRestore.class ).get();
-            data_v1.transferTo( testData.restore() );
+            testData.restore( data_v1.stream() );
 
             UnitOfWork uow = v1_1.module().unitOfWorkFactory().newUnitOfWork();
             TestEntity1_1 entity = uow.get( TestEntity1_1.class, id );
@@ -157,7 +157,10 @@ public class MigrationTest
             assertThat( "Association has been renamed", entity.newFooAssoc().get(), CoreMatchers.equalTo( entity ) );
             uow.complete();
 
-            testData.backup().transferTo( data_v1_1 );
+            try( Stream<String> backup = testData.backup() )
+            {
+                data_v1_1 = backup.collect( toList() );
+            }
         }
 
         // Set up version 2.0
@@ -177,7 +180,7 @@ public class MigrationTest
 
             // Test migration from 1.0 -> 2.0
             {
-                data_v1.transferTo( testData.restore() );
+                testData.restore( data_v1.stream() );
                 UnitOfWork uow = v2_0.module().unitOfWorkFactory().newUnitOfWork();
                 TestEntity2_0 entity = uow.get( TestEntity2_0.class, id );
                 assertThat( "Property has been created", entity.bar().get(), CoreMatchers.equalTo( "Some value" ) );
@@ -202,11 +205,11 @@ public class MigrationTest
             };
 
             BackupRestore testData = v3_0.module().findService( BackupRestore.class ).get();
-            data_v1_1.transferTo( testData.restore() );
+            testData.restore( data_v1_1.stream() );
 
             // Test migration from 1.0 -> 3.0
             {
-                data_v1.transferTo( testData.restore() );
+                testData.restore( data_v1.stream() );
                 UnitOfWork uow = v3_0.module().unitOfWorkFactory().newUnitOfWork();
                 org.apache.zest.migration.moved.TestEntity2_0 entity = uow.get( org.apache.zest.migration.moved.TestEntity2_0.class, id );
                 uow.complete();
@@ -258,51 +261,4 @@ public class MigrationTest
             System.out.println( msg );
         }
     }
-
-    private static class StringInputOutput
-        implements Output<String, IOException>, Input<String, IOException>
-    {
-        final StringBuilder builder = new StringBuilder();
-
-        @Override
-        public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender )
-            throws IOException, SenderThrowableType
-        {
-            sender.sendTo((Receiver<String, IOException>) item -> builder.append( item ).append( "\n" ));
-        }
-
-        @Override
-        public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output )
-            throws IOException, ReceiverThrowableType
-        {
-            output.receiveFrom( new Sender<String, IOException>()
-            {
-                @Override
-                public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super String, ReceiverThrowableType> receiver )
-                    throws ReceiverThrowableType, IOException
-                {
-                    BufferedReader reader = new BufferedReader( new StringReader( builder.toString() ) );
-                    String line;
-                    try
-                    {
-                        while( ( line = reader.readLine() ) != null )
-                        {
-                            receiver.receive( line );
-                        }
-                    }
-                    finally
-                    {
-                        reader.close();
-                    }
-                }
-            } );
-        }
-
-        @Override
-        public String toString()
-        {
-            return builder.toString();
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/reindexer/src/main/java/org/apache/zest/index/reindexer/internal/ReindexerMixin.java
----------------------------------------------------------------------
diff --git a/extensions/reindexer/src/main/java/org/apache/zest/index/reindexer/internal/ReindexerMixin.java b/extensions/reindexer/src/main/java/org/apache/zest/index/reindexer/internal/ReindexerMixin.java
index c3a3af4..f11dc77 100644
--- a/extensions/reindexer/src/main/java/org/apache/zest/index/reindexer/internal/ReindexerMixin.java
+++ b/extensions/reindexer/src/main/java/org/apache/zest/index/reindexer/internal/ReindexerMixin.java
@@ -21,7 +21,7 @@
 package org.apache.zest.index.reindexer.internal;
 
 import java.util.ArrayList;
-import org.apache.zest.api.common.QualifiedName;
+import java.util.stream.Stream;
 import org.apache.zest.api.configuration.Configuration;
 import org.apache.zest.api.identity.HasIdentity;
 import org.apache.zest.api.injection.scope.Service;
@@ -31,9 +31,6 @@ import org.apache.zest.api.service.ServiceReference;
 import org.apache.zest.api.structure.ModuleDescriptor;
 import org.apache.zest.index.reindexer.Reindexer;
 import org.apache.zest.index.reindexer.ReindexerConfiguration;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.spi.entity.EntityState;
 import org.apache.zest.spi.entitystore.EntityStore;
 import org.apache.zest.spi.entitystore.StateChangeListener;
@@ -67,52 +64,43 @@ public class ReindexerMixin
         {
             loadValue = 50;
         }
-        new ReindexerOutput( loadValue ).reindex( store );
+        ReindexerHelper helper = new ReindexerHelper( loadValue );
+        helper.reindex( store );
     }
 
-    private class ReindexerOutput
-        implements Output<EntityState, RuntimeException>, Receiver<EntityState, RuntimeException>
+    private class ReindexerHelper
     {
         private int count;
         private int loadValue;
         private ArrayList<EntityState> states;
 
-        public ReindexerOutput( Integer loadValue )
+        private ReindexerHelper( int loadValue )
         {
             this.loadValue = loadValue;
             states = new ArrayList<>();
         }
 
-        public void reindex( EntityStore store )
+        private void reindex( EntityStore store )
         {
-
-            store.entityStates( module ).transferTo( this );
-            reindexState();
-        }
-
-        @Override
-        public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends EntityState, SenderThrowableType> sender )
-            throws RuntimeException, SenderThrowableType
-        {
-            sender.sendTo( this );
-            reindexState();
-        }
-
-        @Override
-        public void receive( EntityState item )
-            throws RuntimeException
-        {
-            count++;
-            item.setPropertyValue( HasIdentity.IDENTITY_STATE_NAME, item.entityReference().identity() );
-            states.add( item );
-
-            if( states.size() >= loadValue )
+            try( Stream<EntityState> entityStates = store.entityStates( module ) )
             {
-                reindexState();
+                entityStates
+                    .forEach( entityState ->
+                              {
+                                  count++;
+                                  entityState.setPropertyValue( HasIdentity.IDENTITY_STATE_NAME,
+                                                                entityState.entityReference().identity() );
+                                  states.add( entityState );
+                                  if( states.size() >= loadValue )
+                                  {
+                                      reindexState();
+                                  }
+                              } );
             }
+            reindexState();
         }
 
-        public void reindexState()
+        private void reindexState()
         {
             for( ServiceReference<StateChangeListener> listener : listeners )
             {

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/libraries/logging/src/test/java/org/apache/zest/library/logging/DebuggingTest.java
----------------------------------------------------------------------
diff --git a/libraries/logging/src/test/java/org/apache/zest/library/logging/DebuggingTest.java b/libraries/logging/src/test/java/org/apache/zest/library/logging/DebuggingTest.java
index ae75fb4..1fccdde 100644
--- a/libraries/logging/src/test/java/org/apache/zest/library/logging/DebuggingTest.java
+++ b/libraries/logging/src/test/java/org/apache/zest/library/logging/DebuggingTest.java
@@ -20,9 +20,8 @@
 
 package org.apache.zest.library.logging;
 
-import java.util.function.Function;
+import java.util.stream.Stream;
 import org.apache.zest.api.identity.Identity;
-import org.junit.Test;
 import org.apache.zest.api.injection.scope.This;
 import org.apache.zest.api.mixin.Mixins;
 import org.apache.zest.api.service.ServiceComposite;
@@ -31,8 +30,6 @@ import org.apache.zest.api.unitofwork.UnitOfWork;
 import org.apache.zest.api.unitofwork.UnitOfWorkCompletionException;
 import org.apache.zest.bootstrap.AssemblyException;
 import org.apache.zest.bootstrap.ModuleAssembly;
-import org.apache.zest.io.Outputs;
-import org.apache.zest.io.Transforms;
 import org.apache.zest.library.logging.debug.Debug;
 import org.apache.zest.library.logging.debug.DebugConcern;
 import org.apache.zest.library.logging.debug.records.ServiceDebugRecordEntity;
@@ -42,6 +39,7 @@ import org.apache.zest.spi.entity.EntityState;
 import org.apache.zest.spi.entitystore.EntityStore;
 import org.apache.zest.test.AbstractZestTest;
 import org.apache.zest.test.EntityTestAssembler;
+import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
@@ -76,19 +74,18 @@ public class DebuggingTest
             assertEquals( message, "Hello!" );
             EntityStore es = serviceFinder.findService( EntityStore.class ).get();
             final Identity[] result = new Identity[1];
-            es.entityStates( module ).transferTo( Transforms.map( new Function<EntityState, EntityState>()
-                    {
-                        public EntityState apply( EntityState entityState )
-                        {
-                            if( ServiceDebugRecordEntity.class.getName()
-                                    .equals( entityState.entityDescriptor().types().findFirst().get().getName() ) )
-                            {
-                                result[0] = entityState.entityReference().identity();
-                            }
-
-                            return entityState;
-                        }
-                    }, Outputs.<EntityState>noop() ));
+            try( Stream<EntityState> entityStates = es.entityStates( module ) )
+            {
+                entityStates
+                    .forEach( entityState ->
+                              {
+                                  if( ServiceDebugRecordEntity.class.getName().equals(
+                                      entityState.entityDescriptor().types().findFirst().get().getName() ) )
+                                  {
+                                      result[ 0 ] = entityState.entityReference().identity();
+                                  }
+                              } );
+            }
 
             ServiceDebugRecordEntity debugEntry = uow.get( ServiceDebugRecordEntity.class, result[ 0 ] );
             String mess = debugEntry.message().get();

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/libraries/sql/src/main/java/org/apache/zest/library/sql/common/SQLUtil.java
----------------------------------------------------------------------
diff --git a/libraries/sql/src/main/java/org/apache/zest/library/sql/common/SQLUtil.java b/libraries/sql/src/main/java/org/apache/zest/library/sql/common/SQLUtil.java
index 12a2bd4..e87c9c6 100644
--- a/libraries/sql/src/main/java/org/apache/zest/library/sql/common/SQLUtil.java
+++ b/libraries/sql/src/main/java/org/apache/zest/library/sql/common/SQLUtil.java
@@ -26,43 +26,94 @@ import java.sql.Statement;
 
 public class SQLUtil
 {
-
     public static void closeQuietly( ResultSet resultSet )
     {
-        if ( resultSet != null ) {
-            try {
+        closeQuietly( resultSet, null );
+    }
+
+    public static void closeQuietly( ResultSet resultSet, Throwable originalException )
+    {
+        if( resultSet != null )
+        {
+            try
+            {
                 resultSet.close();
-            } catch ( SQLException ignored ) {
+            }
+            catch( SQLException ignored )
+            {
+                if( originalException != null )
+                {
+                    originalException.addSuppressed( ignored );
+                }
             }
         }
     }
 
     public static void closeQuietly( Statement select )
     {
-        if ( select != null ) {
-            try {
+        closeQuietly( select, null );
+    }
+
+    public static void closeQuietly( Statement select, Throwable originalException )
+    {
+        if( select != null )
+        {
+            try
+            {
                 select.close();
-            } catch ( SQLException ignored ) {
+            }
+            catch( SQLException ignored )
+            {
+                if( originalException != null )
+                {
+                    originalException.addSuppressed( ignored );
+                }
             }
         }
     }
 
     public static void closeQuietly( Connection connection )
     {
-        if ( connection != null ) {
-            try {
+        closeQuietly( connection, null );
+    }
+
+    public static void closeQuietly( Connection connection, Throwable originalException )
+    {
+        if( connection != null )
+        {
+            try
+            {
                 connection.close();
-            } catch ( SQLException ignored ) {
+            }
+            catch( SQLException ignored )
+            {
+                if( originalException != null )
+                {
+                    originalException.addSuppressed( ignored );
+                }
             }
         }
     }
 
     public static void rollbackQuietly( Connection connection )
     {
-        if ( connection != null ) {
-            try {
+        rollbackQuietly( connection, null );
+    }
+
+    public static void rollbackQuietly( Connection connection, Throwable originalException )
+    {
+        if( connection != null )
+        {
+            try
+            {
                 connection.rollback();
-            } catch ( SQLException ignored ) {
+            }
+            catch( SQLException ignored )
+            {
+                if( originalException != null )
+                {
+                    originalException.addSuppressed( ignored );
+                }
             }
         }
     }
@@ -70,5 +121,4 @@ public class SQLUtil
     private SQLUtil()
     {
     }
-
 }


[2/2] zest-java git commit: entitystores: replace usage of core/io with streams

Posted by pa...@apache.org.
entitystores: replace usage of core/io with streams


Project: http://git-wip-us.apache.org/repos/asf/zest-java/repo
Commit: http://git-wip-us.apache.org/repos/asf/zest-java/commit/eb4e31a9
Tree: http://git-wip-us.apache.org/repos/asf/zest-java/tree/eb4e31a9
Diff: http://git-wip-us.apache.org/repos/asf/zest-java/diff/eb4e31a9

Branch: refs/heads/replace-io-by-streams
Commit: eb4e31a97a92609feb2e7bb18d135de432d511db
Parents: 7ac5338
Author: Paul Merlin <pa...@apache.org>
Authored: Sat Dec 3 12:12:22 2016 +0100
Committer: Paul Merlin <pa...@apache.org>
Committed: Sat Dec 3 12:12:22 2016 +0100

----------------------------------------------------------------------
 .../memory/MemoryMapEntityStoreMixin.java       | 107 ++----
 .../zest/spi/entitystore/BackupRestore.java     |  25 +-
 .../zest/spi/entitystore/EntityStore.java       |  10 +-
 .../helpers/JSONMapEntityStoreMixin.java        | 125 +++----
 .../spi/entitystore/helpers/MapEntityStore.java |   6 +-
 .../helpers/MapEntityStoreMixin.java            | 112 +++---
 .../test/entity/AbstractEntityStoreTest.java    |  32 ++
 .../entitystore/file/FileEntityStoreMixin.java  |  98 ++----
 .../geode/GeodeEntityStoreMixin.java            |  29 +-
 .../hazelcast/HazelcastEntityStoreMixin.java    |  29 +-
 .../jclouds/JCloudsMapEntityStoreMixin.java     |  73 ++--
 .../entitystore/jdbm/JdbmEntityStoreMixin.java  | 293 ++++++++--------
 .../leveldb/LevelDBEntityStoreMixin.java        |  65 ++--
 .../mongodb/MongoMapEntityStoreMixin.java       |  42 +--
 .../prefs/PreferencesEntityStoreMixin.java      |  55 +--
 .../redis/RedisMapEntityStoreMixin.java         |  42 +--
 .../riak/RiakMapEntityStoreMixin.java           |  82 ++---
 .../entitystore/sql/SQLEntityStoreMixin.java    | 337 ++++++++++---------
 .../apache/zest/migration/MigrationTest.java    |  90 ++---
 .../reindexer/internal/ReindexerMixin.java      |  54 ++-
 .../zest/library/logging/DebuggingTest.java     |  31 +-
 .../apache/zest/library/sql/common/SQLUtil.java |  78 ++++-
 22 files changed, 739 insertions(+), 1076 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java b/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java
index 893e17a..79ad54d 100644
--- a/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java
+++ b/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java
@@ -23,18 +23,13 @@ import java.io.IOException;
 import java.io.Reader;
 import java.io.StringReader;
 import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.io.Writer;
 import java.util.HashMap;
 import java.util.Map;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
+import java.util.stream.Stream;
 import org.apache.zest.api.entity.EntityDescriptor;
 import org.apache.zest.api.entity.EntityReference;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.spi.entitystore.BackupRestore;
 import org.apache.zest.spi.entitystore.EntityAlreadyExistsException;
 import org.apache.zest.spi.entitystore.EntityNotFoundException;
@@ -42,6 +37,9 @@ import org.apache.zest.spi.entitystore.EntityStoreException;
 import org.apache.zest.spi.entitystore.helpers.JSONKeys;
 import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
 import org.apache.zest.spi.entitystore.helpers.MapEntityStoreActivation;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
 
 /**
  * In-memory implementation of MapEntityStore.
@@ -90,95 +88,34 @@ public class MemoryMapEntityStoreMixin
     }
 
     @Override
-    public Input<Reader, IOException> entityStates()
+    public Stream<Reader> entityStates()
     {
-        return new Input<Reader, IOException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<Reader, IOException>()
-                {
-                    @Override
-                    public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
-                        throws ReceiverThrowableType, IOException
-                    {
-                        for( String state : store.values() )
-                        {
-                            receiver.receive( new StringReader( state ) );
-                        }
-                    }
-                } );
-            }
-        };
+        return store.values().stream().map( StringReader::new );
     }
 
     @Override
-    public Input<String, IOException> backup()
+    public Stream<String> backup()
     {
-        return new Input<String, IOException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<String, IOException>()
-                {
-                    @Override
-                    public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super String, ReceiverThrowableType> receiver )
-                        throws ReceiverThrowableType, IOException
-                    {
-                        for( String state : store.values() )
-                        {
-                            receiver.receive( state );
-                        }
-                    }
-                } );
-            }
-        };
+        return store.values().stream();
     }
 
     @Override
-    public Output<String, IOException> restore()
+    public void restore( final Stream<String> stream )
     {
-        return new Output<String, IOException>()
-        {
-            @Override
-            public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender )
-                throws IOException, SenderThrowableType
+        store.clear();
+        stream.forEach( item -> {
+            try
             {
-                store.clear();
-
-                try
-                {
-                    sender.sendTo( new Receiver<String, IOException>()
-                    {
-                        @Override
-                        public void receive( String item )
-                            throws IOException
-                        {
-                            try
-                            {
-                                JSONTokener tokener = new JSONTokener( item );
-                                JSONObject entity = (JSONObject) tokener.nextValue();
-                                String id = entity.getString( JSONKeys.IDENTITY );
-                                store.put( EntityReference.parseEntityReference( id ), item );
-                            }
-                            catch( JSONException e )
-                            {
-                                throw new IOException( e );
-                            }
-                        }
-                    } );
-                }
-                catch( IOException e )
-                {
-                    store.clear();
-                    throw e;
-                }
+                JSONTokener tokener = new JSONTokener( item );
+                JSONObject entity = (JSONObject) tokener.nextValue();
+                String id = entity.getString( JSONKeys.IDENTITY );
+                store.put( EntityReference.parseEntityReference( id ), item );
+            }
+            catch( JSONException e )
+            {
+                throw new UncheckedIOException( new IOException( e ) );
             }
-        };
+        } );
     }
 
     private class MemoryMapChanger

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java
index e7cb5be..63aa3aa 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java
@@ -20,9 +20,8 @@
 
 package org.apache.zest.spi.entitystore;
 
-import java.io.IOException;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
 
 /**
  * Allow backups and restores of data in an EntityStore to be made
@@ -30,16 +29,22 @@ import org.apache.zest.io.Output;
 public interface BackupRestore
 {
     /**
-     * Input that allows data from the entity store to be backed up.
-     *
-     * @return An Input instance containing the data to back up.
+     * Backup as a stream of serialized entity states, must be closed.
+     */
+    Stream<String> backup();
+
+    /**
+     * Restore from a stream of serialized entity states.
      */
-    Input<String, IOException> backup();
+    void restore( Stream<String> states );
 
     /**
-     * Output that allows data to be restored from a backup.
+     * Restore from streams of serialized entity states.
      *
-     * @return An Output instance to receive the restored data.
+     * @return A consumer of streams of serialized entity states
      */
-    Output<String, IOException> restore();
+    default Consumer<Stream<String>> restore()
+    {
+        return this::restore;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java
index 71752b0..2e002c2 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java
@@ -20,9 +20,9 @@
 package org.apache.zest.spi.entitystore;
 
 import java.time.Instant;
+import java.util.stream.Stream;
 import org.apache.zest.api.structure.ModuleDescriptor;
 import org.apache.zest.api.usecase.Usecase;
-import org.apache.zest.io.Input;
 import org.apache.zest.spi.entity.EntityState;
 
 /**
@@ -32,5 +32,11 @@ public interface EntityStore
 {
     EntityStoreUnitOfWork newUnitOfWork( ModuleDescriptor module, Usecase usecase, Instant currentTime );
 
-    Input<EntityState, EntityStoreException> entityStates( ModuleDescriptor module );
+    /**
+     * Stream of all entity states, must be closed.
+     *
+     * @param module Module
+     * @return Stream of all entity states, must be closed
+     */
+    Stream<EntityState> entityStates( ModuleDescriptor module );
 }

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java
index 90d5acb..ebaa2f2 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java
@@ -29,6 +29,7 @@ import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.stream.Stream;
 import org.apache.zest.api.cache.CacheOptions;
 import org.apache.zest.api.common.Optional;
 import org.apache.zest.api.entity.EntityDescriptor;
@@ -47,10 +48,6 @@ import org.apache.zest.api.structure.ModuleDescriptor;
 import org.apache.zest.api.unitofwork.NoSuchEntityTypeException;
 import org.apache.zest.api.usecase.Usecase;
 import org.apache.zest.api.value.ValueSerialization;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.spi.ZestSPI;
 import org.apache.zest.spi.cache.Cache;
 import org.apache.zest.spi.cache.CachePool;
@@ -310,102 +307,62 @@ public class JSONMapEntityStoreMixin
     }
 
     @Override
-    public Input<EntityState, EntityStoreException> entityStates( final ModuleDescriptor module )
+    public Stream<EntityState> entityStates( ModuleDescriptor module )
     {
-        return new Input<EntityState, EntityStoreException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super EntityState, ReceiverThrowableType> output )
-                throws EntityStoreException, ReceiverThrowableType
+        List<EntityState> migrated = new ArrayList<>();
+        return mapEntityStore.entityStates().map(
+            reader ->
             {
-                output.receiveFrom( new Sender<EntityState, EntityStoreException>()
+                EntityState entity = readEntityState( module, reader );
+                if( entity.status() == EntityStatus.UPDATED )
                 {
-                    @Override
-                    public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super EntityState, ReceiverThrowableType> receiver )
-                        throws ReceiverThrowableType, EntityStoreException
+                    migrated.add( entity );
+                    // Synch back 100 at a time
+                    if( migrated.size() > 100 )
                     {
-                        final List<EntityState> migrated = new ArrayList<>();
-                        try
-                        {
-                            mapEntityStore.entityStates().transferTo( new Output<Reader, ReceiverThrowableType>()
-                            {
-                                @Override
-                                public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends Reader, SenderThrowableType> sender )
-                                    throws ReceiverThrowableType, SenderThrowableType
-                                {
-                                    sender.sendTo( new Receiver<Reader, ReceiverThrowableType>()
-                                    {
-                                        @Override
-                                        public void receive( Reader item )
-                                            throws ReceiverThrowableType
-                                        {
-                                            final EntityState entity = readEntityState( module, item );
-                                            if( entity.status() == EntityStatus.UPDATED )
-                                            {
-                                                migrated.add( entity );
-
-                                                // Synch back 100 at a time
-                                                if( migrated.size() > 100 )
-                                                {
-                                                    try
-                                                    {
-                                                        synchMigratedEntities( migrated );
-                                                    }
-                                                    catch( IOException e )
-                                                    {
-                                                        throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
-                                                    }
-                                                }
-                                            }
-                                            receiver.receive( entity );
-                                        }
-                                    } );
-
-                                    // Synch any remaining migrated entities
-                                    if( !migrated.isEmpty() )
-                                    {
-                                        try
-                                        {
-                                            synchMigratedEntities( migrated );
-                                        }
-                                        catch( IOException e )
-                                        {
-                                            throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
-                                        }
-                                    }
-                                }
-                            } );
-                        }
-                        catch( IOException e )
-                        {
-                            throw new EntityStoreException( e );
-                        }
+                        synchMigratedEntities( migrated );
                     }
-                } );
+                }
+                return entity;
             }
-        };
+        ).onClose(
+            () ->
+            {
+                // Synch any remaining migrated entities
+                if( !migrated.isEmpty() )
+                {
+                    synchMigratedEntities( migrated );
+                }
+            }
+        );
     }
 
     private void synchMigratedEntities( final List<EntityState> migratedEntities )
-        throws IOException
     {
-        mapEntityStore.applyChanges( new MapEntityStore.MapChanges()
+        try
         {
-            @Override
-            public void visitMap( MapEntityStore.MapChanger changer )
-                throws IOException
+            mapEntityStore.applyChanges( new MapEntityStore.MapChanges()
             {
-                for( EntityState migratedEntity : migratedEntities )
+                @Override
+                public void visitMap( MapEntityStore.MapChanger changer )
+                    throws IOException
                 {
-                    JSONEntityState state = (JSONEntityState) migratedEntity;
-                    try (Writer writer = changer.updateEntity( state.entityReference(), state.entityDescriptor() ))
+                    for( EntityState migratedEntity : migratedEntities )
                     {
-                        writeEntityState( state, writer, state.version(), state.lastModified() );
+                        JSONEntityState state = (JSONEntityState) migratedEntity;
+                        try( Writer writer = changer.updateEntity( state.entityReference(), state.entityDescriptor() ) )
+                        {
+                            writeEntityState( state, writer, state.version(), state.lastModified() );
+                        }
                     }
                 }
-            }
-        } );
-        migratedEntities.clear();
+            } );
+            migratedEntities.clear();
+        }
+        catch( IOException ex )
+        {
+            throw new EntityStoreException( "Synchronization of Migrated Entities failed.", ex );
+        }
     }
 
     protected Identity newUnitOfWorkId()

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java
index 2ec20a0..435494f 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java
@@ -22,9 +22,9 @@ package org.apache.zest.spi.entitystore.helpers;
 import java.io.IOException;
 import java.io.Reader;
 import java.io.Writer;
+import java.util.stream.Stream;
 import org.apache.zest.api.entity.EntityDescriptor;
 import org.apache.zest.api.entity.EntityReference;
-import org.apache.zest.io.Input;
 import org.apache.zest.spi.entitystore.EntityNotFoundException;
 import org.apache.zest.spi.entitystore.EntityStoreException;
 
@@ -42,9 +42,9 @@ public interface MapEntityStore
         throws EntityStoreException;
 
     /**
-     * @return All entities state Readers
+     * @return All entities state Readers, must be closed
      */
-    Input<Reader, IOException> entityStates();
+    Stream<Reader> entityStates();
 
     void applyChanges( MapChanges changes )
         throws IOException;

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java
index fceae54..8a61ced 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 import org.apache.zest.api.common.Optional;
 import org.apache.zest.api.common.QualifiedName;
 import org.apache.zest.api.entity.EntityDescriptor;
@@ -46,10 +47,6 @@ import org.apache.zest.api.unitofwork.NoSuchEntityTypeException;
 import org.apache.zest.api.usecase.Usecase;
 import org.apache.zest.api.value.ValueSerialization;
 import org.apache.zest.api.value.ValueSerializationException;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.spi.ZestSPI;
 import org.apache.zest.spi.entity.EntityState;
 import org.apache.zest.spi.entity.EntityStatus;
@@ -200,74 +197,47 @@ public class MapEntityStoreMixin
     }
 
     @Override
-    public Input<EntityState, EntityStoreException> entityStates( final ModuleDescriptor module )
+    public Stream<EntityState> entityStates( final ModuleDescriptor module )
     {
-        return new Input<EntityState, EntityStoreException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super EntityState, ReceiverThrowableType> output )
-                throws EntityStoreException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<EntityState, EntityStoreException>()
-                {
-                    @Override
-                    public <RecThrowableType extends Throwable> void sendTo( final Receiver<? super EntityState, RecThrowableType> receiver )
-                        throws RecThrowableType, EntityStoreException
-                    {
-                        final List<EntityState> migrated = new ArrayList<>();
-                        try
-                        {
-                            mapEntityStore.entityStates().transferTo( new Output<Reader, RecThrowableType>()
-                            {
-                                @Override
-                                public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends Reader, SenderThrowableType> sender )
-                                    throws RecThrowableType, SenderThrowableType
-                                {
-                                    sender.sendTo( item -> {
-                                        final EntityState entity = readEntityState( module, item );
-                                        if( entity.status() == EntityStatus.UPDATED )
-                                        {
-                                            migrated.add( entity );
-
-                                            // Synch back 100 at a time
-                                            if( migrated.size() > 100 )
-                                            {
-                                                try
-                                                {
-                                                    synchMigratedEntities( migrated );
-                                                }
-                                                catch( IOException e )
-                                                {
-                                                    throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
-                                                }
-                                            }
-                                        }
-                                        receiver.receive( entity );
-                                    } );
-
-                                    // Synch any remaining migrated entities
-                                    if( !migrated.isEmpty() )
-                                    {
-                                        try
-                                        {
-                                            synchMigratedEntities( migrated );
-                                        }
-                                        catch( IOException e )
-                                        {
-                                            throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
-                                        }
-                                    }
-                                }
-                            } );
-                        }
-                        catch( IOException e )
-                        {
-                            throw new EntityStoreException( e );
-                        }
-                    }
-                } );
-            }
-        };
+        List<EntityState> migrated = new ArrayList<>();
+        return mapEntityStore
+            .entityStates()
+            .map( reader ->
+                  {
+                      EntityState entity = readEntityState( module, reader );
+                      if( entity.status() == EntityStatus.UPDATED )
+                      {
+                          migrated.add( entity );
+                          // Synch back 100 at a time
+                          if( migrated.size() > 100 )
+                          {
+                              try
+                              {
+                                  synchMigratedEntities( migrated );
+                              }
+                              catch( IOException e )
+                              {
+                                  throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
+                              }
+                          }
+                      }
+                      return entity;
+                  } )
+            .onClose( () ->
+                      {
+                          // Synch any remaining migrated entities
+                          if( !migrated.isEmpty() )
+                          {
+                              try
+                              {
+                                  synchMigratedEntities( migrated );
+                              }
+                              catch( IOException e )
+                              {
+                                  throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
+                              }
+                          }
+                      } );
     }
 
     private void synchMigratedEntities( final List<EntityState> migratedEntities )

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java
----------------------------------------------------------------------
diff --git a/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java b/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java
index ab05f39..74368ae 100644
--- a/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java
+++ b/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java
@@ -31,6 +31,7 @@ import java.time.ZonedDateTime;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 import org.apache.zest.api.association.Association;
 import org.apache.zest.api.association.ManyAssociation;
 import org.apache.zest.api.association.NamedAssociation;
@@ -51,6 +52,7 @@ import org.apache.zest.api.value.ValueBuilder;
 import org.apache.zest.api.value.ValueComposite;
 import org.apache.zest.bootstrap.AssemblyException;
 import org.apache.zest.bootstrap.ModuleAssembly;
+import org.apache.zest.spi.entity.EntityState;
 import org.apache.zest.spi.entitystore.EntityStore;
 import org.apache.zest.test.AbstractZestTest;
 import org.junit.After;
@@ -501,6 +503,36 @@ public abstract class AbstractEntityStoreTest
         }
     }
 
+    @Test
+    public void entityStatesSPI()
+    {
+        EntityStore entityStore = serviceFinder.findService( EntityStore.class ).get();
+
+        try( Stream<EntityState> states = entityStore.entityStates( module ) )
+        {
+            assertThat( states.count(), is( 0L ) );
+        }
+
+        UnitOfWork unitOfWork = unitOfWorkFactory.newUnitOfWork();
+        TestEntity newInstance = createEntity( unitOfWork );
+        unitOfWork.complete();
+
+        try( Stream<EntityState> states = entityStore.entityStates( module ) )
+        {
+            assertThat( states.count(), is( 1L ) );
+        }
+
+        unitOfWork = unitOfWorkFactory.newUnitOfWork();
+        TestEntity instance = unitOfWork.get( newInstance );
+        unitOfWork.remove( instance );
+        unitOfWork.complete();
+
+        try( Stream<EntityState> states = entityStore.entityStates( module ) )
+        {
+            assertThat( states.count(), is( 0L ) );
+        }
+    }
+
     public interface TestEntity
         extends EntityComposite
     {

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java b/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java
index 1ab1c53..500f313 100644
--- a/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java
+++ b/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java
@@ -29,16 +29,13 @@ import java.io.Writer;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
+import java.util.stream.Stream;
 import org.apache.zest.api.common.Optional;
 import org.apache.zest.api.configuration.Configuration;
 import org.apache.zest.api.entity.EntityDescriptor;
 import org.apache.zest.api.entity.EntityReference;
 import org.apache.zest.api.injection.scope.Service;
 import org.apache.zest.api.injection.scope.This;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.library.fileconfig.FileConfiguration;
 import org.apache.zest.spi.entitystore.BackupRestore;
 import org.apache.zest.spi.entitystore.EntityAlreadyExistsException;
@@ -231,84 +228,43 @@ public class FileEntityStoreMixin
     }
 
     @Override
-    public Input<String, IOException> backup()
+    public Stream<Reader> entityStates()
     {
-        return new Input<String, IOException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<String, IOException>()
-                {
-                    @Override
-                    public <ThrowableType extends Throwable> void sendTo( Receiver<? super String, ThrowableType> receiver )
-                        throws ThrowableType, IOException
-                    {
-                        for( File sliceDirectory : dataDirectory.listFiles() )
-                        {
-                            for( File file : sliceDirectory.listFiles() )
-                            {
-                                receiver.receive( fetch( file ) );
-                            }
-                        }
-                    }
-                } );
-            }
-        };
+        return backup().map( StringReader::new );
     }
 
     @Override
-    public Output<String, IOException> restore()
+    public Stream<String> backup()
     {
-        return new Output<String, IOException>()
+        if( !dataDirectory.exists() )
         {
-            @Override
-            public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender )
-                throws IOException, SenderThrowableType
-            {
-                sender.sendTo( new Receiver<String, IOException>()
-                {
-                    @Override
-                    public void receive( String item )
-                        throws IOException
-                    {
-                        String id = item.substring( "{\"reference\":\"".length() );
-                        id = id.substring( 0, id.indexOf( '"' ) );
-                        store( getDataFile( id ), item );
-                    }
-                } );
-            }
-        };
+            return Stream.of();
+        }
+        try
+        {
+            return java.nio.file.Files.walk( dataDirectory.toPath(), 3 )
+                                      .skip( 1 )
+                                      .filter( path -> !"slices".equals( path.getFileName().toString() ) )
+                                      .map( Path::toFile )
+                                      .filter( file -> !file.isDirectory() )
+                                      .map( this::uncheckedFetch );
+        }
+        catch( IOException ex )
+        {
+            throw new EntityStoreException( ex );
+        }
     }
 
     @Override
-    public Input<Reader, IOException> entityStates()
+    public void restore( final Stream<String> stream )
     {
-        return new Input<Reader, IOException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
+        stream.forEach(
+            item ->
             {
-                output.receiveFrom( new Sender<Reader, IOException>()
-                {
-                    @Override
-                    public <ThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ThrowableType> receiver )
-                        throws ThrowableType, IOException
-                    {
-                        for( File sliceDirectory : dataDirectory.listFiles() )
-                        {
-                            for( File file : sliceDirectory.listFiles() )
-                            {
-                                String state = fetch( file );
-                                receiver.receive( new StringReader( state ) );
-                            }
-                        }
-                    }
-                } );
-            }
-        };
+                String id = item.substring( "{\"reference\":\"".length() );
+                id = id.substring( 0, id.indexOf( '"' ) );
+                uncheckedStore( getDataFile( id ), item );
+            } );
     }
 
     private File getDataFile( String identity )

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java b/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java
index fca8a6b..25a58b7 100644
--- a/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java
+++ b/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java
@@ -23,8 +23,8 @@ import java.io.Reader;
 import java.io.StringReader;
 import java.io.StringWriter;
 import java.io.Writer;
-import java.util.Map;
 import java.util.Properties;
+import java.util.stream.Stream;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.Region;
@@ -39,10 +39,6 @@ import org.apache.zest.api.entity.EntityDescriptor;
 import org.apache.zest.api.entity.EntityReference;
 import org.apache.zest.api.injection.scope.This;
 import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.spi.entitystore.EntityNotFoundException;
 import org.apache.zest.spi.entitystore.EntityStoreException;
 import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
@@ -197,27 +193,8 @@ public class GeodeEntityStoreMixin
     }
 
     @Override
-    public Input<Reader, IOException> entityStates()
+    public Stream<Reader> entityStates()
     {
-        return new Input<Reader, IOException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
-                    throws IOException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<Reader, IOException>()
-                {
-                    @Override
-                    public <RTT extends Throwable> void sendTo( Receiver<? super Reader, RTT> receiver )
-                            throws RTT, IOException
-                    {
-                        for( Map.Entry<String, String> eachEntry : region.entrySet() )
-                        {
-                            receiver.receive( new StringReader( eachEntry.getValue() ) );
-                        }
-                    }
-                } );
-            }
-        };
+        return region.values().stream().map( StringReader::new );
     }
 }

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java b/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java
index f588862..34386f2 100644
--- a/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java
+++ b/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java
@@ -30,16 +30,12 @@ import java.io.Reader;
 import java.io.StringReader;
 import java.io.StringWriter;
 import java.io.Writer;
-import java.util.Map;
+import java.util.stream.Stream;
 import org.apache.zest.api.configuration.Configuration;
 import org.apache.zest.api.entity.EntityDescriptor;
 import org.apache.zest.api.entity.EntityReference;
 import org.apache.zest.api.injection.scope.This;
 import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.spi.entitystore.EntityNotFoundException;
 import org.apache.zest.spi.entitystore.EntityStoreException;
 import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
@@ -147,28 +143,9 @@ public class HazelcastEntityStoreMixin
     }
 
     @Override
-    public Input<Reader, IOException> entityStates()
+    public Stream<Reader> entityStates()
     {
-        return new Input<Reader, IOException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<Reader, IOException>()
-                {
-                    @Override
-                    public <RTT extends Throwable> void sendTo( Receiver<? super Reader, RTT> receiver )
-                        throws RTT, IOException
-                    {
-                        for( Map.Entry<String, String> eachEntry : stringMap.entrySet() )
-                        {
-                            receiver.receive( new StringReader( eachEntry.getValue() ) );
-                        }
-                    }
-                } );
-            }
-        };
+        return stringMap.values().stream().map( StringReader::new );
     }
 
     private Config createConfig( HazelcastConfiguration configuration )

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java b/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java
index f8c2ac1..b895177 100644
--- a/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java
+++ b/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java
@@ -26,7 +26,6 @@ import com.google.common.collect.Maps;
 import com.google.common.io.ByteSource;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.Reader;
 import java.io.StringReader;
 import java.io.StringWriter;
@@ -36,15 +35,12 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Scanner;
 import java.util.Set;
+import java.util.stream.Stream;
 import org.apache.zest.api.configuration.Configuration;
 import org.apache.zest.api.entity.EntityDescriptor;
 import org.apache.zest.api.entity.EntityReference;
 import org.apache.zest.api.injection.scope.This;
 import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.spi.entitystore.EntityNotFoundException;
 import org.apache.zest.spi.entitystore.EntityStoreException;
 import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
@@ -54,7 +50,6 @@ import org.jclouds.apis.Apis;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.BlobStoreContext;
 import org.jclouds.blobstore.domain.Blob;
-import org.jclouds.blobstore.domain.StorageMetadata;
 import org.jclouds.io.Payload;
 import org.jclouds.providers.ProviderMetadata;
 import org.jclouds.providers.Providers;
@@ -254,52 +249,26 @@ public class JCloudsMapEntityStoreMixin
     }
 
     @Override
-    public Input<Reader, IOException> entityStates()
+    public Stream<Reader> entityStates()
     {
-        return new Input<Reader, IOException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
-            {
-                output.receiveFrom(
-                    new Sender<Reader, IOException>()
-                    {
-                        @Override
-                        public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
-                            throws ReceiverThrowableType, IOException
-                        {
-                            for( StorageMetadata stored : storeContext.getBlobStore().list() )
-                            {
-                                Payload payload = storeContext.getBlobStore().getBlob( container, stored.getName() ).getPayload();
-                                if( payload == null )
-                                {
-                                    throw new EntityNotFoundException( EntityReference.parseEntityReference( stored.getName() ) );
-                                }
-                                InputStream input = null;
-                                try
-                                {
-                                    input = payload.openStream();
-                                    receiver.receive( new InputStreamReader( input, "UTF-8" ) );
-                                }
-                                finally
-                                {
-                                    if( input != null )
-                                    {
-                                        try
-                                        {
-                                            input.close();
-                                        }
-                                        catch( IOException ignored )
-                                        {
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    }
-                );
-            }
-        };
+        return storeContext
+            .getBlobStore().list( container ).stream()
+            .map( metadata ->
+                  {
+                      Payload payload = storeContext.getBlobStore().getBlob( container, metadata.getName() ).getPayload();
+                      if( payload == null )
+                      {
+                          throw new EntityNotFoundException( EntityReference.parseEntityReference( metadata.getName() ) );
+                      }
+                      try( InputStream input = payload.openStream() )
+                      {
+                          String state = new Scanner( input, UTF_8.name() ).useDelimiter( "\\Z" ).next();
+                          return (Reader) new StringReader( state );
+                      }
+                      catch( IOException ex )
+                      {
+                          throw new EntityStoreException( ex );
+                      }
+                  } );
     }
 }

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java b/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java
index fe3a9cf..19c2b7d 100644
--- a/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java
+++ b/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java
@@ -24,9 +24,16 @@ import java.io.IOException;
 import java.io.Reader;
 import java.io.StringReader;
 import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.io.Writer;
 import java.util.Properties;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 import jdbm.RecordManager;
 import jdbm.RecordManagerFactory;
 import jdbm.RecordManagerOptions;
@@ -48,10 +55,6 @@ import org.apache.zest.api.injection.scope.This;
 import org.apache.zest.api.injection.scope.Uses;
 import org.apache.zest.api.service.ServiceDescriptor;
 import org.apache.zest.io.Files;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.library.fileconfig.FileConfiguration;
 import org.apache.zest.library.locking.ReadLock;
 import org.apache.zest.library.locking.WriteLock;
@@ -213,198 +216,166 @@ public class JdbmEntityStoreMixin
     }
 
     @Override
-    public Input<Reader, IOException> entityStates()
+    public Stream<Reader> entityStates()
     {
-        return new Input<Reader, IOException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
-            {
-                lock.writeLock().lock();
-
-                try
-                {
-                    output.receiveFrom( new Sender<Reader, IOException>()
-                    {
-                        @Override
-                        public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
-                            throws ReceiverThrowableType, IOException
-                        {
-                            final TupleBrowser browser = index.browse();
-                            final Tuple tuple = new Tuple();
-
-                            while( browser.getNext( tuple ) )
-                            {
-                                Identity id = new StringIdentity( (byte[]) tuple.getKey() );
-
-                                Long stateIndex = getStateIndex( id );
-
-                                if( stateIndex == null )
-                                {
-                                    continue;
-                                } // Skip this one
-
-                                byte[] serializedState = (byte[]) recordManager.fetch( stateIndex, serializer );
-
-                                receiver.receive( new StringReader( new String( serializedState, "UTF-8" ) ) );
-                            }
-                        }
-                    } );
-                }
-                finally
-                {
-                    lock.writeLock().unlock();
-                }
-            }
-        };
+        return backup().map( StringReader::new );
     }
 
     @Override
-    public Input<String, IOException> backup()
+    public Stream<String> backup()
     {
-        return new Input<String, IOException>()
+        lock.writeLock().lock();
+        TupleBrowser browser;
+        try
         {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
+            browser = index.browse();
+        }
+        catch( IOException ex )
+        {
+            lock.writeLock().unlock();
+            throw new EntityStoreException( ex );
+        }
+        return StreamSupport.stream(
+            new Spliterators.AbstractSpliterator<String>( Long.MAX_VALUE, Spliterator.ORDERED )
             {
-                lock.readLock().lock();
+                private final Tuple tuple = new Tuple();
 
-                try
+                @Override
+                public boolean tryAdvance( final Consumer<? super String> action )
                 {
-                    output.receiveFrom( new Sender<String, IOException>()
+                    try
                     {
-                        @Override
-                        public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super String, ReceiverThrowableType> receiver )
-                            throws ReceiverThrowableType, IOException
+                        if( !browser.getNext( tuple ) )
                         {
-                            final TupleBrowser browser = index.browse();
-                            final Tuple tuple = new Tuple();
-
-                            while( browser.getNext( tuple ) )
-                            {
-                                String id = new String( (byte[]) tuple.getKey(), "UTF-8" );
-
-                                Long stateIndex = getStateIndex( new StringIdentity( id ) );
-
-                                if( stateIndex == null )
-                                {
-                                    continue;
-                                } // Skip this one
-
-                                byte[] serializedState = (byte[]) recordManager.fetch( stateIndex, serializer );
-
-                                receiver.receive( new String( serializedState, "UTF-8" ) );
-                            }
+                            return false;
                         }
-                    } );
-                }
-                finally
-                {
-                    lock.readLock().unlock();
+                        Identity identity = new StringIdentity( (byte[]) tuple.getKey() );
+                        Long stateIndex = getStateIndex( identity );
+                        if( stateIndex == null )
+                        {
+                            return false;
+                        }
+                        byte[] serializedState = (byte[]) recordManager.fetch( stateIndex, serializer );
+                        String state = new String( serializedState, "UTF-8" );
+                        action.accept( state );
+                        return true;
+                    }
+                    catch( IOException ex )
+                    {
+                        lock.writeLock().unlock();
+                        throw new EntityStoreException( ex );
+                    }
                 }
-            }
-        };
+            },
+            false
+        ).onClose( () -> lock.writeLock().unlock() );
     }
 
     @Override
-    public Output<String, IOException> restore()
+    public void restore( final Stream<String> states )
     {
-        return new Output<String, IOException>()
+        File dbFile = new File( getDatabaseName() + ".db" );
+        File lgFile = new File( getDatabaseName() + ".lg" );
+
+        // Create temporary store
+        File tempDatabase = Files.createTemporayFileOf( dbFile );
+        final RecordManager recordManager;
+        final BTree index;
+        try
+        {
+            recordManager = RecordManagerFactory.createRecordManager( tempDatabase.getAbsolutePath(),
+                                                                      new Properties() );
+            ByteArrayComparator comparator = new ByteArrayComparator();
+            index = BTree.createInstance( recordManager, comparator, serializer, DefaultSerializer.INSTANCE, 16 );
+            recordManager.setNamedObject( "index", index.getRecid() );
+            recordManager.commit();
+        }
+        catch( IOException ex )
+        {
+            throw new EntityStoreException( ex );
+        }
+        try
         {
-            @Override
-            public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender )
-                throws IOException, SenderThrowableType
+            // TODO NO NEED TO SYNCHRONIZE HERE
+            AtomicLong counter = new AtomicLong();
+            Consumer<String> stateConsumer = state ->
             {
-                File dbFile = new File( getDatabaseName() + ".db" );
-                File lgFile = new File( getDatabaseName() + ".lg" );
-
-                // Create temporary store
-                File tempDatabase = Files.createTemporayFileOf( dbFile );
-
-                final RecordManager recordManager = RecordManagerFactory.createRecordManager( tempDatabase.getAbsolutePath(), new Properties() );
-                ByteArrayComparator comparator = new ByteArrayComparator();
-                final BTree index = BTree.createInstance( recordManager, comparator, serializer, DefaultSerializer.INSTANCE, 16 );
-                recordManager.setNamedObject( "index", index.getRecid() );
-                recordManager.commit();
-
                 try
                 {
-                    sender.sendTo( new Receiver<String, IOException>()
+                    // Commit one batch
+                    if( ( counter.incrementAndGet() % 1000 ) == 0 )
                     {
-                        int counter = 0;
-
-                        @Override
-                        public void receive( String item )
-                            throws IOException
-                        {
-                            // Commit one batch
-                            if( ( counter++ % 1000 ) == 0 )
-                            {
-                                recordManager.commit();
-                            }
+                        recordManager.commit();
+                    }
 
-                            String id = item.substring( "{\"reference\":\"".length() );
-                            id = id.substring( 0, id.indexOf( '"' ) );
+                    String id = state.substring( "{\"reference\":\"".length() );
+                    id = id.substring( 0, id.indexOf( '"' ) );
 
-                            // Insert
-                            byte[] stateArray = item.getBytes( "UTF-8" );
-                            long stateIndex = recordManager.insert( stateArray, serializer );
-                            index.insert( id.getBytes( "UTF-8" ), stateIndex, false );
-                        }
-                    } );
+                    // Insert
+                    byte[] stateArray = state.getBytes( "UTF-8" );
+                    long stateIndex = recordManager.insert( stateArray, serializer );
+                    index.insert( id.getBytes( "UTF-8" ), stateIndex, false );
                 }
-                catch( IOException e )
+                catch( IOException ex )
                 {
-                    recordManager.close();
-                    tempDatabase.delete();
-                    throw e;
+                    throw new UncheckedIOException( ex );
                 }
-                catch( Throwable senderThrowableType )
-                {
-                    recordManager.close();
-                    tempDatabase.delete();
-                    throw (SenderThrowableType) senderThrowableType;
-                }
-
-                // Import went ok - continue
-                recordManager.commit();
-                // close file handles otherwise Microsoft Windows will fail to rename database files.
+            };
+            states.forEach( stateConsumer );
+            // Import went ok - continue
+            recordManager.commit();
+            // close file handles otherwise Microsoft Windows will fail to rename database files.
+            recordManager.close();
+        }
+        catch( IOException | UncheckedIOException ex )
+        {
+            try
+            {
                 recordManager.close();
+            }
+            catch( IOException ignore ) { }
+            tempDatabase.delete();
+            throw new EntityStoreException( ex );
+        }
+        try
+        {
 
-                lock.writeLock().lock();
-                try
-                {
-                    // Replace old database with new
-                    JdbmEntityStoreMixin.this.recordManager.close();
-
-                    boolean deletedOldDatabase = true;
-                    deletedOldDatabase &= dbFile.delete();
-                    deletedOldDatabase &= lgFile.delete();
-                    if( !deletedOldDatabase )
-                    {
-                        throw new IOException( "Could not remove old database" );
-                    }
+            lock.writeLock().lock();
+            try
+            {
+                // Replace old database with new
+                JdbmEntityStoreMixin.this.recordManager.close();
 
-                    boolean renamedTempDatabase = true;
-                    renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".db" ).renameTo( dbFile );
-                    renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".lg" ).renameTo( lgFile );
+                boolean deletedOldDatabase = true;
+                deletedOldDatabase &= dbFile.delete();
+                deletedOldDatabase &= lgFile.delete();
+                if( !deletedOldDatabase )
+                {
+                    throw new EntityStoreException( "Could not remove old database" );
+                }
 
-                    if( !renamedTempDatabase )
-                    {
-                        throw new IOException( "Could not replace database with temp database" );
-                    }
+                boolean renamedTempDatabase = true;
+                renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".db" ).renameTo( dbFile );
+                renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".lg" ).renameTo( lgFile );
 
-                    // Start up again
-                    initialize();
-                }
-                finally
+                if( !renamedTempDatabase )
                 {
-                    lock.writeLock().unlock();
+                    throw new EntityStoreException( "Could not replace database with temp database" );
                 }
+
+                // Start up again
+                initialize();
             }
-        };
+            finally
+            {
+                lock.writeLock().unlock();
+            }
+        }
+        catch( IOException ex )
+        {
+            tempDatabase.delete();
+            throw new EntityStoreException( ex );
+        }
     }
 
     private String getDatabaseName()

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java b/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java
index 638b021..7e485b2 100644
--- a/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java
+++ b/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java
@@ -26,6 +26,11 @@ import java.io.StringReader;
 import java.io.StringWriter;
 import java.io.Writer;
 import java.nio.charset.Charset;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 import org.apache.zest.api.configuration.Configuration;
 import org.apache.zest.api.entity.EntityDescriptor;
 import org.apache.zest.api.entity.EntityReference;
@@ -34,10 +39,6 @@ import org.apache.zest.api.injection.scope.This;
 import org.apache.zest.api.injection.scope.Uses;
 import org.apache.zest.api.service.ServiceActivation;
 import org.apache.zest.api.service.ServiceDescriptor;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.library.fileconfig.FileConfiguration;
 import org.apache.zest.spi.entitystore.EntityNotFoundException;
 import org.apache.zest.spi.entitystore.EntityStoreException;
@@ -197,42 +198,38 @@ public class LevelDBEntityStoreMixin
     }
 
     @Override
-    public Input<Reader, IOException> entityStates()
+    public Stream<Reader> entityStates()
     {
-        return new Input<Reader, IOException>()
-        {
-
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
+        DBIterator iterator = db.iterator();
+        iterator.seekToFirst();
+        return StreamSupport.stream(
+            new Spliterators.AbstractSpliterator<Reader>( Long.MAX_VALUE, Spliterator.ORDERED )
             {
-                output.receiveFrom( new Sender<Reader, IOException>()
+                @Override
+                public boolean tryAdvance( final Consumer<? super Reader> action )
                 {
-
-                    @Override
-                    public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
-                        throws ReceiverThrowableType, IOException
+                    if( !iterator.hasNext() )
                     {
-                        DBIterator iterator = db.iterator();
-                        try
-                        {
-                            for( iterator.seekToFirst(); iterator.hasNext(); iterator.next() )
-                            {
-                                byte[] state = iterator.peekNext().getValue();
-                                String jsonState = new String( state, charset );
-                                receiver.receive( new StringReader( jsonState ) );
-                            }
-                        }
-                        finally
-                        {
-                            iterator.close();
-                        }
+                        return false;
                     }
-
-                } );
+                    action.accept( new StringReader( new String( iterator.next().getValue(), charset ) ) );
+                    return true;
+                }
+            },
+            false
+        ).onClose(
+            () ->
+            {
+                try
+                {
+                    iterator.close();
+                }
+                catch( IOException ex )
+                {
+                    throw new EntityStoreException( "Unable to close DB iterator" );
+                }
             }
-
-        };
+        );
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java b/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java
index 9b988f5..ee97171 100644
--- a/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java
+++ b/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java
@@ -25,7 +25,6 @@ import com.mongodb.MongoClientOptions;
 import com.mongodb.MongoCredential;
 import com.mongodb.ServerAddress;
 import com.mongodb.WriteConcern;
-import com.mongodb.client.FindIterable;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoCursor;
 import com.mongodb.client.MongoDatabase;
@@ -40,15 +39,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 import org.apache.zest.api.configuration.Configuration;
 import org.apache.zest.api.entity.EntityDescriptor;
 import org.apache.zest.api.entity.EntityReference;
 import org.apache.zest.api.injection.scope.This;
 import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.spi.entitystore.EntityNotFoundException;
 import org.apache.zest.spi.entitystore.EntityStoreException;
 import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
@@ -289,33 +286,16 @@ public class MongoMapEntityStoreMixin
     }
 
     @Override
-    public Input<Reader, IOException> entityStates()
+    public Stream<Reader> entityStates()
     {
-        return new Input<Reader, IOException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo(
-                Output<? super Reader, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<Reader, IOException>()
-                {
-                    @Override
-                    public <ReceiverThrowableType extends Throwable> void sendTo(
-                        Receiver<? super Reader, ReceiverThrowableType> receiver )
-                        throws ReceiverThrowableType, IOException
-                    {
-                        FindIterable<Document> cursor = db.getCollection( collectionName ).find();
-                        for( Document eachEntity : cursor )
-                        {
-                            Document bsonState = (Document) eachEntity.get( STATE_COLUMN );
-                            String jsonState = JSON.serialize( bsonState );
-                            receiver.receive( new StringReader( jsonState ) );
-                        }
-                    }
-                } );
-            }
-        };
+        return StreamSupport
+            .stream( db.getCollection( collectionName ).find().spliterator(), false )
+            .map( eachEntity ->
+                  {
+                      Document bsonState = (Document) eachEntity.get( STATE_COLUMN );
+                      String jsonState = JSON.serialize( bsonState );
+                      return new StringReader( jsonState );
+                  } );
     }
 
     private Bson byIdentity( EntityReference entityReference )

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java b/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java
index f76f8d3..09918a2 100644
--- a/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java
+++ b/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.prefs.BackingStoreException;
 import java.util.prefs.Preferences;
+import java.util.stream.Stream;
 import org.apache.zest.api.cache.CacheOptions;
 import org.apache.zest.api.common.QualifiedName;
 import org.apache.zest.api.entity.EntityDescriptor;
@@ -57,10 +58,6 @@ import org.apache.zest.api.usecase.Usecase;
 import org.apache.zest.api.usecase.UsecaseBuilder;
 import org.apache.zest.api.value.ValueSerialization;
 import org.apache.zest.api.value.ValueSerializationException;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.spi.ZestSPI;
 import org.apache.zest.spi.entity.EntityState;
 import org.apache.zest.spi.entity.EntityStatus;
@@ -175,43 +172,23 @@ public class PreferencesEntityStoreMixin
     }
 
     @Override
-    public Input<EntityState, EntityStoreException> entityStates( final ModuleDescriptor module )
+    public Stream<EntityState> entityStates( final ModuleDescriptor module )
     {
-        return new Input<EntityState, EntityStoreException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super EntityState, ReceiverThrowableType> output )
-                throws EntityStoreException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<EntityState, EntityStoreException>()
-                {
-                    @Override
-                    public <RecThrowableType extends Throwable> void sendTo( Receiver<? super EntityState, RecThrowableType> receiver )
-                        throws RecThrowableType, EntityStoreException
-                    {
-                        UsecaseBuilder builder = UsecaseBuilder.buildUsecase( "zest.entitystore.preferences.visit" );
-                        Usecase visitUsecase = builder.withMetaInfo( CacheOptions.NEVER ).newUsecase();
-                        final EntityStoreUnitOfWork uow =
-                            newUnitOfWork( module, visitUsecase, SystemTime.now() );
+        UsecaseBuilder builder = UsecaseBuilder.buildUsecase( "zest.entitystore.preferences.visit" );
+        Usecase visitUsecase = builder.withMetaInfo( CacheOptions.NEVER ).newUsecase();
+        EntityStoreUnitOfWork uow = newUnitOfWork( module, visitUsecase, SystemTime.now() );
 
-                        try
-                        {
-                            String[] identities = root.childrenNames();
-                            for( String identity : identities )
-                            {
-                                EntityReference reference = EntityReference.parseEntityReference( identity );
-                                EntityState entityState = uow.entityStateOf( module, reference );
-                                receiver.receive( entityState );
-                            }
-                        }
-                        catch( BackingStoreException e )
-                        {
-                            throw new EntityStoreException( e );
-                        }
-                    }
-                } );
-            }
-        };
+        try
+        {
+            return Stream.of( root.childrenNames() )
+                         .map( EntityReference::parseEntityReference )
+                         .map( ref -> uow.entityStateOf( module, ref ) )
+                         .onClose( uow::discard );
+        }
+        catch( BackingStoreException e )
+        {
+            throw new EntityStoreException( e );
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java b/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java
index 4113e39..9bc2446 100644
--- a/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java
+++ b/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java
@@ -24,16 +24,12 @@ import java.io.Reader;
 import java.io.StringReader;
 import java.io.StringWriter;
 import java.io.Writer;
-import java.util.Set;
+import java.util.stream.Stream;
 import org.apache.zest.api.configuration.Configuration;
 import org.apache.zest.api.entity.EntityDescriptor;
 import org.apache.zest.api.entity.EntityReference;
 import org.apache.zest.api.injection.scope.This;
 import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.spi.entitystore.EntityAlreadyExistsException;
 import org.apache.zest.spi.entitystore.EntityNotFoundException;
 import org.apache.zest.spi.entitystore.EntityStoreException;
@@ -164,38 +160,12 @@ public class RedisMapEntityStoreMixin
     }
 
     @Override
-    public Input<Reader, IOException> entityStates()
+    public Stream<Reader> entityStates()
     {
-        return new Input<Reader, IOException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<Reader, IOException>()
-                {
-                    @Override
-                    public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
-                        throws ReceiverThrowableType, IOException
-                    {
-                        Jedis jedis = pool.getResource();
-                        try
-                        {
-                            Set<String> keys = jedis.keys( "*" );
-                            for( String key : keys )
-                            {
-                                String jsonState = jedis.get( key );
-                                receiver.receive( new StringReader( jsonState ) );
-                            }
-                        }
-                        finally
-                        {
-                            pool.returnResource( jedis );
-                        }
-                    }
-                } );
-            }
-        };
+        Jedis jedis = pool.getResource();
+        return jedis.keys( "*" ).stream()
+                    .map( key -> (Reader) new StringReader( jedis.get( key ) ) )
+                    .onClose( jedis::close );
     }
 
     private static boolean notFound( String jsonState )

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java b/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java
index 0160dfa..fcda8fa 100644
--- a/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java
+++ b/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java
@@ -28,20 +28,6 @@ import com.basho.riak.client.core.RiakNode;
 import com.basho.riak.client.core.query.Location;
 import com.basho.riak.client.core.query.Namespace;
 import com.basho.riak.client.core.util.HostAndPort;
-import org.apache.zest.api.common.InvalidApplicationException;
-import org.apache.zest.api.configuration.Configuration;
-import org.apache.zest.api.entity.EntityDescriptor;
-import org.apache.zest.api.entity.EntityReference;
-import org.apache.zest.api.injection.scope.This;
-import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
-import org.apache.zest.spi.entitystore.EntityNotFoundException;
-import org.apache.zest.spi.entitystore.EntityStoreException;
-import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
-
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -57,6 +43,17 @@ import java.security.Security;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.zest.api.common.InvalidApplicationException;
+import org.apache.zest.api.configuration.Configuration;
+import org.apache.zest.api.entity.EntityDescriptor;
+import org.apache.zest.api.entity.EntityReference;
+import org.apache.zest.api.injection.scope.This;
+import org.apache.zest.api.service.ServiceActivation;
+import org.apache.zest.spi.entitystore.EntityNotFoundException;
+import org.apache.zest.spi.entitystore.EntityStoreException;
+import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
 
 /**
  * Riak Protobuf implementation of MapEntityStore.
@@ -334,40 +331,33 @@ public class RiakMapEntityStoreMixin implements ServiceActivation, MapEntityStor
     }
 
     @Override
-    public Input<Reader, IOException> entityStates()
+    public Stream<Reader> entityStates()
     {
-        return new Input<Reader, IOException>()
+        try
         {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
-                    throws IOException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<Reader, IOException>()
-                {
-                    @Override
-                    public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
-                            throws ReceiverThrowableType, IOException
-                    {
-                        try
-                        {
-                            ListKeys listKeys = new ListKeys.Builder( namespace ).build();
-                            ListKeys.Response listKeysResponse = riakClient.execute( listKeys );
-                            for( Location location : listKeysResponse )
-                            {
-                                FetchValue fetch = new FetchValue.Builder( location ).build();
-                                FetchValue.Response response = riakClient.execute( fetch );
-                                String jsonState = response.getValue( String.class );
-                                receiver.receive( new StringReader( jsonState ) );
-                            }
-                        }
-                        catch( InterruptedException | ExecutionException ex )
-                        {
-                            throw new EntityStoreException( "Unable to apply entity changes.", ex );
-                        }
-                    }
-                } );
-            }
-        };
+            ListKeys listKeys = new ListKeys.Builder( namespace ).build();
+            ListKeys.Response listKeysResponse = riakClient.execute( listKeys );
+            return StreamSupport
+                .stream( listKeysResponse.spliterator(), false )
+                .map( location ->
+                      {
+                          try
+                          {
+                              FetchValue fetch = new FetchValue.Builder( location ).build();
+                              FetchValue.Response response = riakClient.execute( fetch );
+                              String jsonState = response.getValue( String.class );
+                              return new StringReader( jsonState );
+                          }
+                          catch( InterruptedException | ExecutionException ex )
+                          {
+                              throw new EntityStoreException( "Unable to get entity states.", ex );
+                          }
+                      } );
+        }
+        catch( InterruptedException | ExecutionException ex )
+        {
+            throw new EntityStoreException( "Unable to get entity states.", ex );
+        }
     }