You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@polygene.apache.org by pa...@apache.org on 2016/12/03 11:20:11 UTC

[2/2] zest-java git commit: entitystores: replace usage of core/io with streams

entitystores: replace usage of core/io with streams


Project: http://git-wip-us.apache.org/repos/asf/zest-java/repo
Commit: http://git-wip-us.apache.org/repos/asf/zest-java/commit/eb4e31a9
Tree: http://git-wip-us.apache.org/repos/asf/zest-java/tree/eb4e31a9
Diff: http://git-wip-us.apache.org/repos/asf/zest-java/diff/eb4e31a9

Branch: refs/heads/replace-io-by-streams
Commit: eb4e31a97a92609feb2e7bb18d135de432d511db
Parents: 7ac5338
Author: Paul Merlin <pa...@apache.org>
Authored: Sat Dec 3 12:12:22 2016 +0100
Committer: Paul Merlin <pa...@apache.org>
Committed: Sat Dec 3 12:12:22 2016 +0100

----------------------------------------------------------------------
 .../memory/MemoryMapEntityStoreMixin.java       | 107 ++----
 .../zest/spi/entitystore/BackupRestore.java     |  25 +-
 .../zest/spi/entitystore/EntityStore.java       |  10 +-
 .../helpers/JSONMapEntityStoreMixin.java        | 125 +++----
 .../spi/entitystore/helpers/MapEntityStore.java |   6 +-
 .../helpers/MapEntityStoreMixin.java            | 112 +++---
 .../test/entity/AbstractEntityStoreTest.java    |  32 ++
 .../entitystore/file/FileEntityStoreMixin.java  |  98 ++----
 .../geode/GeodeEntityStoreMixin.java            |  29 +-
 .../hazelcast/HazelcastEntityStoreMixin.java    |  29 +-
 .../jclouds/JCloudsMapEntityStoreMixin.java     |  73 ++--
 .../entitystore/jdbm/JdbmEntityStoreMixin.java  | 293 ++++++++--------
 .../leveldb/LevelDBEntityStoreMixin.java        |  65 ++--
 .../mongodb/MongoMapEntityStoreMixin.java       |  42 +--
 .../prefs/PreferencesEntityStoreMixin.java      |  55 +--
 .../redis/RedisMapEntityStoreMixin.java         |  42 +--
 .../riak/RiakMapEntityStoreMixin.java           |  82 ++---
 .../entitystore/sql/SQLEntityStoreMixin.java    | 337 ++++++++++---------
 .../apache/zest/migration/MigrationTest.java    |  90 ++---
 .../reindexer/internal/ReindexerMixin.java      |  54 ++-
 .../zest/library/logging/DebuggingTest.java     |  31 +-
 .../apache/zest/library/sql/common/SQLUtil.java |  78 ++++-
 22 files changed, 739 insertions(+), 1076 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java b/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java
index 893e17a..79ad54d 100644
--- a/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java
+++ b/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java
@@ -23,18 +23,13 @@ import java.io.IOException;
 import java.io.Reader;
 import java.io.StringReader;
 import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.io.Writer;
 import java.util.HashMap;
 import java.util.Map;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
+import java.util.stream.Stream;
 import org.apache.zest.api.entity.EntityDescriptor;
 import org.apache.zest.api.entity.EntityReference;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.spi.entitystore.BackupRestore;
 import org.apache.zest.spi.entitystore.EntityAlreadyExistsException;
 import org.apache.zest.spi.entitystore.EntityNotFoundException;
@@ -42,6 +37,9 @@ import org.apache.zest.spi.entitystore.EntityStoreException;
 import org.apache.zest.spi.entitystore.helpers.JSONKeys;
 import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
 import org.apache.zest.spi.entitystore.helpers.MapEntityStoreActivation;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
 
 /**
  * In-memory implementation of MapEntityStore.
@@ -90,95 +88,34 @@ public class MemoryMapEntityStoreMixin
     }
 
     @Override
-    public Input<Reader, IOException> entityStates()
+    public Stream<Reader> entityStates()
     {
-        return new Input<Reader, IOException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<Reader, IOException>()
-                {
-                    @Override
-                    public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
-                        throws ReceiverThrowableType, IOException
-                    {
-                        for( String state : store.values() )
-                        {
-                            receiver.receive( new StringReader( state ) );
-                        }
-                    }
-                } );
-            }
-        };
+        return store.values().stream().map( StringReader::new );
     }
 
     @Override
-    public Input<String, IOException> backup()
+    public Stream<String> backup()
     {
-        return new Input<String, IOException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<String, IOException>()
-                {
-                    @Override
-                    public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super String, ReceiverThrowableType> receiver )
-                        throws ReceiverThrowableType, IOException
-                    {
-                        for( String state : store.values() )
-                        {
-                            receiver.receive( state );
-                        }
-                    }
-                } );
-            }
-        };
+        return store.values().stream();
     }
 
     @Override
-    public Output<String, IOException> restore()
+    public void restore( final Stream<String> stream )
     {
-        return new Output<String, IOException>()
-        {
-            @Override
-            public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender )
-                throws IOException, SenderThrowableType
+        store.clear();
+        stream.forEach( item -> {
+            try
             {
-                store.clear();
-
-                try
-                {
-                    sender.sendTo( new Receiver<String, IOException>()
-                    {
-                        @Override
-                        public void receive( String item )
-                            throws IOException
-                        {
-                            try
-                            {
-                                JSONTokener tokener = new JSONTokener( item );
-                                JSONObject entity = (JSONObject) tokener.nextValue();
-                                String id = entity.getString( JSONKeys.IDENTITY );
-                                store.put( EntityReference.parseEntityReference( id ), item );
-                            }
-                            catch( JSONException e )
-                            {
-                                throw new IOException( e );
-                            }
-                        }
-                    } );
-                }
-                catch( IOException e )
-                {
-                    store.clear();
-                    throw e;
-                }
+                JSONTokener tokener = new JSONTokener( item );
+                JSONObject entity = (JSONObject) tokener.nextValue();
+                String id = entity.getString( JSONKeys.IDENTITY );
+                store.put( EntityReference.parseEntityReference( id ), item );
+            }
+            catch( JSONException e )
+            {
+                throw new UncheckedIOException( new IOException( e ) );
             }
-        };
+        } );
     }
 
     private class MemoryMapChanger

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java
index e7cb5be..63aa3aa 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java
@@ -20,9 +20,8 @@
 
 package org.apache.zest.spi.entitystore;
 
-import java.io.IOException;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
 
 /**
  * Allow backups and restores of data in an EntityStore to be made
@@ -30,16 +29,22 @@ import org.apache.zest.io.Output;
 public interface BackupRestore
 {
     /**
-     * Input that allows data from the entity store to be backed up.
-     *
-     * @return An Input instance containing the data to back up.
+     * Backup as a stream of serialized entity states, must be closed.
+     */
+    Stream<String> backup();
+
+    /**
+     * Restore from a stream of serialized entity states.
      */
-    Input<String, IOException> backup();
+    void restore( Stream<String> states );
 
     /**
-     * Output that allows data to be restored from a backup.
+     * Restore from streams of serialized entity states.
      *
-     * @return An Output instance to receive the restored data.
+     * @return A consumer of streams of serialized entity states
      */
-    Output<String, IOException> restore();
+    default Consumer<Stream<String>> restore()
+    {
+        return this::restore;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java
index 71752b0..2e002c2 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java
@@ -20,9 +20,9 @@
 package org.apache.zest.spi.entitystore;
 
 import java.time.Instant;
+import java.util.stream.Stream;
 import org.apache.zest.api.structure.ModuleDescriptor;
 import org.apache.zest.api.usecase.Usecase;
-import org.apache.zest.io.Input;
 import org.apache.zest.spi.entity.EntityState;
 
 /**
@@ -32,5 +32,11 @@ public interface EntityStore
 {
     EntityStoreUnitOfWork newUnitOfWork( ModuleDescriptor module, Usecase usecase, Instant currentTime );
 
-    Input<EntityState, EntityStoreException> entityStates( ModuleDescriptor module );
+    /**
+     * Stream of all entity states, must be closed.
+     *
+     * @param module Module
+     * @return Stream of all entity states, must be closed
+     */
+    Stream<EntityState> entityStates( ModuleDescriptor module );
 }

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java
index 90d5acb..ebaa2f2 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java
@@ -29,6 +29,7 @@ import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.stream.Stream;
 import org.apache.zest.api.cache.CacheOptions;
 import org.apache.zest.api.common.Optional;
 import org.apache.zest.api.entity.EntityDescriptor;
@@ -47,10 +48,6 @@ import org.apache.zest.api.structure.ModuleDescriptor;
 import org.apache.zest.api.unitofwork.NoSuchEntityTypeException;
 import org.apache.zest.api.usecase.Usecase;
 import org.apache.zest.api.value.ValueSerialization;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.spi.ZestSPI;
 import org.apache.zest.spi.cache.Cache;
 import org.apache.zest.spi.cache.CachePool;
@@ -310,102 +307,62 @@ public class JSONMapEntityStoreMixin
     }
 
     @Override
-    public Input<EntityState, EntityStoreException> entityStates( final ModuleDescriptor module )
+    public Stream<EntityState> entityStates( ModuleDescriptor module )
     {
-        return new Input<EntityState, EntityStoreException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super EntityState, ReceiverThrowableType> output )
-                throws EntityStoreException, ReceiverThrowableType
+        List<EntityState> migrated = new ArrayList<>();
+        return mapEntityStore.entityStates().map(
+            reader ->
             {
-                output.receiveFrom( new Sender<EntityState, EntityStoreException>()
+                EntityState entity = readEntityState( module, reader );
+                if( entity.status() == EntityStatus.UPDATED )
                 {
-                    @Override
-                    public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super EntityState, ReceiverThrowableType> receiver )
-                        throws ReceiverThrowableType, EntityStoreException
+                    migrated.add( entity );
+                    // Synch back 100 at a time
+                    if( migrated.size() > 100 )
                     {
-                        final List<EntityState> migrated = new ArrayList<>();
-                        try
-                        {
-                            mapEntityStore.entityStates().transferTo( new Output<Reader, ReceiverThrowableType>()
-                            {
-                                @Override
-                                public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends Reader, SenderThrowableType> sender )
-                                    throws ReceiverThrowableType, SenderThrowableType
-                                {
-                                    sender.sendTo( new Receiver<Reader, ReceiverThrowableType>()
-                                    {
-                                        @Override
-                                        public void receive( Reader item )
-                                            throws ReceiverThrowableType
-                                        {
-                                            final EntityState entity = readEntityState( module, item );
-                                            if( entity.status() == EntityStatus.UPDATED )
-                                            {
-                                                migrated.add( entity );
-
-                                                // Synch back 100 at a time
-                                                if( migrated.size() > 100 )
-                                                {
-                                                    try
-                                                    {
-                                                        synchMigratedEntities( migrated );
-                                                    }
-                                                    catch( IOException e )
-                                                    {
-                                                        throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
-                                                    }
-                                                }
-                                            }
-                                            receiver.receive( entity );
-                                        }
-                                    } );
-
-                                    // Synch any remaining migrated entities
-                                    if( !migrated.isEmpty() )
-                                    {
-                                        try
-                                        {
-                                            synchMigratedEntities( migrated );
-                                        }
-                                        catch( IOException e )
-                                        {
-                                            throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
-                                        }
-                                    }
-                                }
-                            } );
-                        }
-                        catch( IOException e )
-                        {
-                            throw new EntityStoreException( e );
-                        }
+                        synchMigratedEntities( migrated );
                     }
-                } );
+                }
+                return entity;
             }
-        };
+        ).onClose(
+            () ->
+            {
+                // Synch any remaining migrated entities
+                if( !migrated.isEmpty() )
+                {
+                    synchMigratedEntities( migrated );
+                }
+            }
+        );
     }
 
     private void synchMigratedEntities( final List<EntityState> migratedEntities )
-        throws IOException
     {
-        mapEntityStore.applyChanges( new MapEntityStore.MapChanges()
+        try
         {
-            @Override
-            public void visitMap( MapEntityStore.MapChanger changer )
-                throws IOException
+            mapEntityStore.applyChanges( new MapEntityStore.MapChanges()
             {
-                for( EntityState migratedEntity : migratedEntities )
+                @Override
+                public void visitMap( MapEntityStore.MapChanger changer )
+                    throws IOException
                 {
-                    JSONEntityState state = (JSONEntityState) migratedEntity;
-                    try (Writer writer = changer.updateEntity( state.entityReference(), state.entityDescriptor() ))
+                    for( EntityState migratedEntity : migratedEntities )
                     {
-                        writeEntityState( state, writer, state.version(), state.lastModified() );
+                        JSONEntityState state = (JSONEntityState) migratedEntity;
+                        try( Writer writer = changer.updateEntity( state.entityReference(), state.entityDescriptor() ) )
+                        {
+                            writeEntityState( state, writer, state.version(), state.lastModified() );
+                        }
                     }
                 }
-            }
-        } );
-        migratedEntities.clear();
+            } );
+            migratedEntities.clear();
+        }
+        catch( IOException ex )
+        {
+            throw new EntityStoreException( "Synchronization of Migrated Entities failed.", ex );
+        }
     }
 
     protected Identity newUnitOfWorkId()

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java
index 2ec20a0..435494f 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java
@@ -22,9 +22,9 @@ package org.apache.zest.spi.entitystore.helpers;
 import java.io.IOException;
 import java.io.Reader;
 import java.io.Writer;
+import java.util.stream.Stream;
 import org.apache.zest.api.entity.EntityDescriptor;
 import org.apache.zest.api.entity.EntityReference;
-import org.apache.zest.io.Input;
 import org.apache.zest.spi.entitystore.EntityNotFoundException;
 import org.apache.zest.spi.entitystore.EntityStoreException;
 
@@ -42,9 +42,9 @@ public interface MapEntityStore
         throws EntityStoreException;
 
     /**
-     * @return All entities state Readers
+     * @return All entities state Readers, must be closed
      */
-    Input<Reader, IOException> entityStates();
+    Stream<Reader> entityStates();
 
     void applyChanges( MapChanges changes )
         throws IOException;

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java
index fceae54..8a61ced 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 import org.apache.zest.api.common.Optional;
 import org.apache.zest.api.common.QualifiedName;
 import org.apache.zest.api.entity.EntityDescriptor;
@@ -46,10 +47,6 @@ import org.apache.zest.api.unitofwork.NoSuchEntityTypeException;
 import org.apache.zest.api.usecase.Usecase;
 import org.apache.zest.api.value.ValueSerialization;
 import org.apache.zest.api.value.ValueSerializationException;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.spi.ZestSPI;
 import org.apache.zest.spi.entity.EntityState;
 import org.apache.zest.spi.entity.EntityStatus;
@@ -200,74 +197,47 @@ public class MapEntityStoreMixin
     }
 
     @Override
-    public Input<EntityState, EntityStoreException> entityStates( final ModuleDescriptor module )
+    public Stream<EntityState> entityStates( final ModuleDescriptor module )
     {
-        return new Input<EntityState, EntityStoreException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super EntityState, ReceiverThrowableType> output )
-                throws EntityStoreException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<EntityState, EntityStoreException>()
-                {
-                    @Override
-                    public <RecThrowableType extends Throwable> void sendTo( final Receiver<? super EntityState, RecThrowableType> receiver )
-                        throws RecThrowableType, EntityStoreException
-                    {
-                        final List<EntityState> migrated = new ArrayList<>();
-                        try
-                        {
-                            mapEntityStore.entityStates().transferTo( new Output<Reader, RecThrowableType>()
-                            {
-                                @Override
-                                public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends Reader, SenderThrowableType> sender )
-                                    throws RecThrowableType, SenderThrowableType
-                                {
-                                    sender.sendTo( item -> {
-                                        final EntityState entity = readEntityState( module, item );
-                                        if( entity.status() == EntityStatus.UPDATED )
-                                        {
-                                            migrated.add( entity );
-
-                                            // Synch back 100 at a time
-                                            if( migrated.size() > 100 )
-                                            {
-                                                try
-                                                {
-                                                    synchMigratedEntities( migrated );
-                                                }
-                                                catch( IOException e )
-                                                {
-                                                    throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
-                                                }
-                                            }
-                                        }
-                                        receiver.receive( entity );
-                                    } );
-
-                                    // Synch any remaining migrated entities
-                                    if( !migrated.isEmpty() )
-                                    {
-                                        try
-                                        {
-                                            synchMigratedEntities( migrated );
-                                        }
-                                        catch( IOException e )
-                                        {
-                                            throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
-                                        }
-                                    }
-                                }
-                            } );
-                        }
-                        catch( IOException e )
-                        {
-                            throw new EntityStoreException( e );
-                        }
-                    }
-                } );
-            }
-        };
+        List<EntityState> migrated = new ArrayList<>();
+        return mapEntityStore
+            .entityStates()
+            .map( reader ->
+                  {
+                      EntityState entity = readEntityState( module, reader );
+                      if( entity.status() == EntityStatus.UPDATED )
+                      {
+                          migrated.add( entity );
+                          // Synch back 100 at a time
+                          if( migrated.size() > 100 )
+                          {
+                              try
+                              {
+                                  synchMigratedEntities( migrated );
+                              }
+                              catch( IOException e )
+                              {
+                                  throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
+                              }
+                          }
+                      }
+                      return entity;
+                  } )
+            .onClose( () ->
+                      {
+                          // Synch any remaining migrated entities
+                          if( !migrated.isEmpty() )
+                          {
+                              try
+                              {
+                                  synchMigratedEntities( migrated );
+                              }
+                              catch( IOException e )
+                              {
+                                  throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
+                              }
+                          }
+                      } );
     }
 
     private void synchMigratedEntities( final List<EntityState> migratedEntities )

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java
----------------------------------------------------------------------
diff --git a/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java b/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java
index ab05f39..74368ae 100644
--- a/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java
+++ b/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java
@@ -31,6 +31,7 @@ import java.time.ZonedDateTime;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 import org.apache.zest.api.association.Association;
 import org.apache.zest.api.association.ManyAssociation;
 import org.apache.zest.api.association.NamedAssociation;
@@ -51,6 +52,7 @@ import org.apache.zest.api.value.ValueBuilder;
 import org.apache.zest.api.value.ValueComposite;
 import org.apache.zest.bootstrap.AssemblyException;
 import org.apache.zest.bootstrap.ModuleAssembly;
+import org.apache.zest.spi.entity.EntityState;
 import org.apache.zest.spi.entitystore.EntityStore;
 import org.apache.zest.test.AbstractZestTest;
 import org.junit.After;
@@ -501,6 +503,36 @@ public abstract class AbstractEntityStoreTest
         }
     }
 
+    @Test
+    public void entityStatesSPI()
+    {
+        EntityStore entityStore = serviceFinder.findService( EntityStore.class ).get();
+
+        try( Stream<EntityState> states = entityStore.entityStates( module ) )
+        {
+            assertThat( states.count(), is( 0L ) );
+        }
+
+        UnitOfWork unitOfWork = unitOfWorkFactory.newUnitOfWork();
+        TestEntity newInstance = createEntity( unitOfWork );
+        unitOfWork.complete();
+
+        try( Stream<EntityState> states = entityStore.entityStates( module ) )
+        {
+            assertThat( states.count(), is( 1L ) );
+        }
+
+        unitOfWork = unitOfWorkFactory.newUnitOfWork();
+        TestEntity instance = unitOfWork.get( newInstance );
+        unitOfWork.remove( instance );
+        unitOfWork.complete();
+
+        try( Stream<EntityState> states = entityStore.entityStates( module ) )
+        {
+            assertThat( states.count(), is( 0L ) );
+        }
+    }
+
     public interface TestEntity
         extends EntityComposite
     {

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java b/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java
index 1ab1c53..500f313 100644
--- a/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java
+++ b/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java
@@ -29,16 +29,13 @@ import java.io.Writer;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
+import java.util.stream.Stream;
 import org.apache.zest.api.common.Optional;
 import org.apache.zest.api.configuration.Configuration;
 import org.apache.zest.api.entity.EntityDescriptor;
 import org.apache.zest.api.entity.EntityReference;
 import org.apache.zest.api.injection.scope.Service;
 import org.apache.zest.api.injection.scope.This;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.library.fileconfig.FileConfiguration;
 import org.apache.zest.spi.entitystore.BackupRestore;
 import org.apache.zest.spi.entitystore.EntityAlreadyExistsException;
@@ -231,84 +228,43 @@ public class FileEntityStoreMixin
     }
 
     @Override
-    public Input<String, IOException> backup()
+    public Stream<Reader> entityStates()
     {
-        return new Input<String, IOException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<String, IOException>()
-                {
-                    @Override
-                    public <ThrowableType extends Throwable> void sendTo( Receiver<? super String, ThrowableType> receiver )
-                        throws ThrowableType, IOException
-                    {
-                        for( File sliceDirectory : dataDirectory.listFiles() )
-                        {
-                            for( File file : sliceDirectory.listFiles() )
-                            {
-                                receiver.receive( fetch( file ) );
-                            }
-                        }
-                    }
-                } );
-            }
-        };
+        return backup().map( StringReader::new );
     }
 
     @Override
-    public Output<String, IOException> restore()
+    public Stream<String> backup()
     {
-        return new Output<String, IOException>()
+        if( !dataDirectory.exists() )
         {
-            @Override
-            public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender )
-                throws IOException, SenderThrowableType
-            {
-                sender.sendTo( new Receiver<String, IOException>()
-                {
-                    @Override
-                    public void receive( String item )
-                        throws IOException
-                    {
-                        String id = item.substring( "{\"reference\":\"".length() );
-                        id = id.substring( 0, id.indexOf( '"' ) );
-                        store( getDataFile( id ), item );
-                    }
-                } );
-            }
-        };
+            return Stream.of();
+        }
+        try
+        {
+            return java.nio.file.Files.walk( dataDirectory.toPath(), 3 )
+                                      .skip( 1 )
+                                      .filter( path -> !"slices".equals( path.getFileName().toString() ) )
+                                      .map( Path::toFile )
+                                      .filter( file -> !file.isDirectory() )
+                                      .map( this::uncheckedFetch );
+        }
+        catch( IOException ex )
+        {
+            throw new EntityStoreException( ex );
+        }
     }
 
     @Override
-    public Input<Reader, IOException> entityStates()
+    public void restore( final Stream<String> stream )
     {
-        return new Input<Reader, IOException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
+        stream.forEach(
+            item ->
             {
-                output.receiveFrom( new Sender<Reader, IOException>()
-                {
-                    @Override
-                    public <ThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ThrowableType> receiver )
-                        throws ThrowableType, IOException
-                    {
-                        for( File sliceDirectory : dataDirectory.listFiles() )
-                        {
-                            for( File file : sliceDirectory.listFiles() )
-                            {
-                                String state = fetch( file );
-                                receiver.receive( new StringReader( state ) );
-                            }
-                        }
-                    }
-                } );
-            }
-        };
+                String id = item.substring( "{\"reference\":\"".length() );
+                id = id.substring( 0, id.indexOf( '"' ) );
+                uncheckedStore( getDataFile( id ), item );
+            } );
     }
 
     private File getDataFile( String identity )

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java b/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java
index fca8a6b..25a58b7 100644
--- a/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java
+++ b/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java
@@ -23,8 +23,8 @@ import java.io.Reader;
 import java.io.StringReader;
 import java.io.StringWriter;
 import java.io.Writer;
-import java.util.Map;
 import java.util.Properties;
+import java.util.stream.Stream;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.Region;
@@ -39,10 +39,6 @@ import org.apache.zest.api.entity.EntityDescriptor;
 import org.apache.zest.api.entity.EntityReference;
 import org.apache.zest.api.injection.scope.This;
 import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.spi.entitystore.EntityNotFoundException;
 import org.apache.zest.spi.entitystore.EntityStoreException;
 import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
@@ -197,27 +193,8 @@ public class GeodeEntityStoreMixin
     }
 
     @Override
-    public Input<Reader, IOException> entityStates()
+    public Stream<Reader> entityStates()
     {
-        return new Input<Reader, IOException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
-                    throws IOException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<Reader, IOException>()
-                {
-                    @Override
-                    public <RTT extends Throwable> void sendTo( Receiver<? super Reader, RTT> receiver )
-                            throws RTT, IOException
-                    {
-                        for( Map.Entry<String, String> eachEntry : region.entrySet() )
-                        {
-                            receiver.receive( new StringReader( eachEntry.getValue() ) );
-                        }
-                    }
-                } );
-            }
-        };
+        return region.values().stream().map( StringReader::new );
     }
 }

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java b/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java
index f588862..34386f2 100644
--- a/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java
+++ b/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java
@@ -30,16 +30,12 @@ import java.io.Reader;
 import java.io.StringReader;
 import java.io.StringWriter;
 import java.io.Writer;
-import java.util.Map;
+import java.util.stream.Stream;
 import org.apache.zest.api.configuration.Configuration;
 import org.apache.zest.api.entity.EntityDescriptor;
 import org.apache.zest.api.entity.EntityReference;
 import org.apache.zest.api.injection.scope.This;
 import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.spi.entitystore.EntityNotFoundException;
 import org.apache.zest.spi.entitystore.EntityStoreException;
 import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
@@ -147,28 +143,9 @@ public class HazelcastEntityStoreMixin
     }
 
     @Override
-    public Input<Reader, IOException> entityStates()
+    public Stream<Reader> entityStates()
     {
-        return new Input<Reader, IOException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<Reader, IOException>()
-                {
-                    @Override
-                    public <RTT extends Throwable> void sendTo( Receiver<? super Reader, RTT> receiver )
-                        throws RTT, IOException
-                    {
-                        for( Map.Entry<String, String> eachEntry : stringMap.entrySet() )
-                        {
-                            receiver.receive( new StringReader( eachEntry.getValue() ) );
-                        }
-                    }
-                } );
-            }
-        };
+        return stringMap.values().stream().map( StringReader::new );
     }
 
     private Config createConfig( HazelcastConfiguration configuration )

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java b/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java
index f8c2ac1..b895177 100644
--- a/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java
+++ b/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java
@@ -26,7 +26,6 @@ import com.google.common.collect.Maps;
 import com.google.common.io.ByteSource;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.Reader;
 import java.io.StringReader;
 import java.io.StringWriter;
@@ -36,15 +35,12 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Scanner;
 import java.util.Set;
+import java.util.stream.Stream;
 import org.apache.zest.api.configuration.Configuration;
 import org.apache.zest.api.entity.EntityDescriptor;
 import org.apache.zest.api.entity.EntityReference;
 import org.apache.zest.api.injection.scope.This;
 import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.spi.entitystore.EntityNotFoundException;
 import org.apache.zest.spi.entitystore.EntityStoreException;
 import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
@@ -54,7 +50,6 @@ import org.jclouds.apis.Apis;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.BlobStoreContext;
 import org.jclouds.blobstore.domain.Blob;
-import org.jclouds.blobstore.domain.StorageMetadata;
 import org.jclouds.io.Payload;
 import org.jclouds.providers.ProviderMetadata;
 import org.jclouds.providers.Providers;
@@ -254,52 +249,26 @@ public class JCloudsMapEntityStoreMixin
     }
 
     @Override
-    public Input<Reader, IOException> entityStates()
+    public Stream<Reader> entityStates()
     {
-        return new Input<Reader, IOException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
-            {
-                output.receiveFrom(
-                    new Sender<Reader, IOException>()
-                    {
-                        @Override
-                        public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
-                            throws ReceiverThrowableType, IOException
-                        {
-                            for( StorageMetadata stored : storeContext.getBlobStore().list() )
-                            {
-                                Payload payload = storeContext.getBlobStore().getBlob( container, stored.getName() ).getPayload();
-                                if( payload == null )
-                                {
-                                    throw new EntityNotFoundException( EntityReference.parseEntityReference( stored.getName() ) );
-                                }
-                                InputStream input = null;
-                                try
-                                {
-                                    input = payload.openStream();
-                                    receiver.receive( new InputStreamReader( input, "UTF-8" ) );
-                                }
-                                finally
-                                {
-                                    if( input != null )
-                                    {
-                                        try
-                                        {
-                                            input.close();
-                                        }
-                                        catch( IOException ignored )
-                                        {
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    }
-                );
-            }
-        };
+        return storeContext
+            .getBlobStore().list( container ).stream()
+            .map( metadata ->
+                  {
+                      Payload payload = storeContext.getBlobStore().getBlob( container, metadata.getName() ).getPayload();
+                      if( payload == null )
+                      {
+                          throw new EntityNotFoundException( EntityReference.parseEntityReference( metadata.getName() ) );
+                      }
+                      try( InputStream input = payload.openStream() )
+                      {
+                          String state = new Scanner( input, UTF_8.name() ).useDelimiter( "\\Z" ).next();
+                          return (Reader) new StringReader( state );
+                      }
+                      catch( IOException ex )
+                      {
+                          throw new EntityStoreException( ex );
+                      }
+                  } );
     }
 }

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java b/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java
index fe3a9cf..19c2b7d 100644
--- a/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java
+++ b/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java
@@ -24,9 +24,16 @@ import java.io.IOException;
 import java.io.Reader;
 import java.io.StringReader;
 import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.io.Writer;
 import java.util.Properties;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 import jdbm.RecordManager;
 import jdbm.RecordManagerFactory;
 import jdbm.RecordManagerOptions;
@@ -48,10 +55,6 @@ import org.apache.zest.api.injection.scope.This;
 import org.apache.zest.api.injection.scope.Uses;
 import org.apache.zest.api.service.ServiceDescriptor;
 import org.apache.zest.io.Files;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.library.fileconfig.FileConfiguration;
 import org.apache.zest.library.locking.ReadLock;
 import org.apache.zest.library.locking.WriteLock;
@@ -213,198 +216,166 @@ public class JdbmEntityStoreMixin
     }
 
     @Override
-    public Input<Reader, IOException> entityStates()
+    public Stream<Reader> entityStates()
     {
-        return new Input<Reader, IOException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
-            {
-                lock.writeLock().lock();
-
-                try
-                {
-                    output.receiveFrom( new Sender<Reader, IOException>()
-                    {
-                        @Override
-                        public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
-                            throws ReceiverThrowableType, IOException
-                        {
-                            final TupleBrowser browser = index.browse();
-                            final Tuple tuple = new Tuple();
-
-                            while( browser.getNext( tuple ) )
-                            {
-                                Identity id = new StringIdentity( (byte[]) tuple.getKey() );
-
-                                Long stateIndex = getStateIndex( id );
-
-                                if( stateIndex == null )
-                                {
-                                    continue;
-                                } // Skip this one
-
-                                byte[] serializedState = (byte[]) recordManager.fetch( stateIndex, serializer );
-
-                                receiver.receive( new StringReader( new String( serializedState, "UTF-8" ) ) );
-                            }
-                        }
-                    } );
-                }
-                finally
-                {
-                    lock.writeLock().unlock();
-                }
-            }
-        };
+        return backup().map( StringReader::new );
     }
 
     @Override
-    public Input<String, IOException> backup()
+    public Stream<String> backup()
     {
-        return new Input<String, IOException>()
+        lock.writeLock().lock();
+        TupleBrowser browser;
+        try
         {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
+            browser = index.browse();
+        }
+        catch( IOException ex )
+        {
+            lock.writeLock().unlock();
+            throw new EntityStoreException( ex );
+        }
+        return StreamSupport.stream(
+            new Spliterators.AbstractSpliterator<String>( Long.MAX_VALUE, Spliterator.ORDERED )
             {
-                lock.readLock().lock();
+                private final Tuple tuple = new Tuple();
 
-                try
+                @Override
+                public boolean tryAdvance( final Consumer<? super String> action )
                 {
-                    output.receiveFrom( new Sender<String, IOException>()
+                    try
                     {
-                        @Override
-                        public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super String, ReceiverThrowableType> receiver )
-                            throws ReceiverThrowableType, IOException
+                        if( !browser.getNext( tuple ) )
                         {
-                            final TupleBrowser browser = index.browse();
-                            final Tuple tuple = new Tuple();
-
-                            while( browser.getNext( tuple ) )
-                            {
-                                String id = new String( (byte[]) tuple.getKey(), "UTF-8" );
-
-                                Long stateIndex = getStateIndex( new StringIdentity( id ) );
-
-                                if( stateIndex == null )
-                                {
-                                    continue;
-                                } // Skip this one
-
-                                byte[] serializedState = (byte[]) recordManager.fetch( stateIndex, serializer );
-
-                                receiver.receive( new String( serializedState, "UTF-8" ) );
-                            }
+                            return false;
                         }
-                    } );
-                }
-                finally
-                {
-                    lock.readLock().unlock();
+                        Identity identity = new StringIdentity( (byte[]) tuple.getKey() );
+                        Long stateIndex = getStateIndex( identity );
+                        if( stateIndex == null )
+                        {
+                            return false;
+                        }
+                        byte[] serializedState = (byte[]) recordManager.fetch( stateIndex, serializer );
+                        String state = new String( serializedState, "UTF-8" );
+                        action.accept( state );
+                        return true;
+                    }
+                    catch( IOException ex )
+                    {
+                        lock.writeLock().unlock();
+                        throw new EntityStoreException( ex );
+                    }
                 }
-            }
-        };
+            },
+            false
+        ).onClose( () -> lock.writeLock().unlock() );
     }
 
     @Override
-    public Output<String, IOException> restore()
+    public void restore( final Stream<String> states )
     {
-        return new Output<String, IOException>()
+        File dbFile = new File( getDatabaseName() + ".db" );
+        File lgFile = new File( getDatabaseName() + ".lg" );
+
+        // Create temporary store
+        File tempDatabase = Files.createTemporayFileOf( dbFile );
+        final RecordManager recordManager;
+        final BTree index;
+        try
+        {
+            recordManager = RecordManagerFactory.createRecordManager( tempDatabase.getAbsolutePath(),
+                                                                      new Properties() );
+            ByteArrayComparator comparator = new ByteArrayComparator();
+            index = BTree.createInstance( recordManager, comparator, serializer, DefaultSerializer.INSTANCE, 16 );
+            recordManager.setNamedObject( "index", index.getRecid() );
+            recordManager.commit();
+        }
+        catch( IOException ex )
+        {
+            throw new EntityStoreException( ex );
+        }
+        try
         {
-            @Override
-            public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender )
-                throws IOException, SenderThrowableType
+            // TODO NO NEED TO SYNCHRONIZE HERE
+            AtomicLong counter = new AtomicLong();
+            Consumer<String> stateConsumer = state ->
             {
-                File dbFile = new File( getDatabaseName() + ".db" );
-                File lgFile = new File( getDatabaseName() + ".lg" );
-
-                // Create temporary store
-                File tempDatabase = Files.createTemporayFileOf( dbFile );
-
-                final RecordManager recordManager = RecordManagerFactory.createRecordManager( tempDatabase.getAbsolutePath(), new Properties() );
-                ByteArrayComparator comparator = new ByteArrayComparator();
-                final BTree index = BTree.createInstance( recordManager, comparator, serializer, DefaultSerializer.INSTANCE, 16 );
-                recordManager.setNamedObject( "index", index.getRecid() );
-                recordManager.commit();
-
                 try
                 {
-                    sender.sendTo( new Receiver<String, IOException>()
+                    // Commit one batch
+                    if( ( counter.incrementAndGet() % 1000 ) == 0 )
                     {
-                        int counter = 0;
-
-                        @Override
-                        public void receive( String item )
-                            throws IOException
-                        {
-                            // Commit one batch
-                            if( ( counter++ % 1000 ) == 0 )
-                            {
-                                recordManager.commit();
-                            }
+                        recordManager.commit();
+                    }
 
-                            String id = item.substring( "{\"reference\":\"".length() );
-                            id = id.substring( 0, id.indexOf( '"' ) );
+                    String id = state.substring( "{\"reference\":\"".length() );
+                    id = id.substring( 0, id.indexOf( '"' ) );
 
-                            // Insert
-                            byte[] stateArray = item.getBytes( "UTF-8" );
-                            long stateIndex = recordManager.insert( stateArray, serializer );
-                            index.insert( id.getBytes( "UTF-8" ), stateIndex, false );
-                        }
-                    } );
+                    // Insert
+                    byte[] stateArray = state.getBytes( "UTF-8" );
+                    long stateIndex = recordManager.insert( stateArray, serializer );
+                    index.insert( id.getBytes( "UTF-8" ), stateIndex, false );
                 }
-                catch( IOException e )
+                catch( IOException ex )
                 {
-                    recordManager.close();
-                    tempDatabase.delete();
-                    throw e;
+                    throw new UncheckedIOException( ex );
                 }
-                catch( Throwable senderThrowableType )
-                {
-                    recordManager.close();
-                    tempDatabase.delete();
-                    throw (SenderThrowableType) senderThrowableType;
-                }
-
-                // Import went ok - continue
-                recordManager.commit();
-                // close file handles otherwise Microsoft Windows will fail to rename database files.
+            };
+            states.forEach( stateConsumer );
+            // Import went ok - continue
+            recordManager.commit();
+            // close file handles otherwise Microsoft Windows will fail to rename database files.
+            recordManager.close();
+        }
+        catch( IOException | UncheckedIOException ex )
+        {
+            try
+            {
                 recordManager.close();
+            }
+            catch( IOException ignore ) { }
+            tempDatabase.delete();
+            throw new EntityStoreException( ex );
+        }
+        try
+        {
 
-                lock.writeLock().lock();
-                try
-                {
-                    // Replace old database with new
-                    JdbmEntityStoreMixin.this.recordManager.close();
-
-                    boolean deletedOldDatabase = true;
-                    deletedOldDatabase &= dbFile.delete();
-                    deletedOldDatabase &= lgFile.delete();
-                    if( !deletedOldDatabase )
-                    {
-                        throw new IOException( "Could not remove old database" );
-                    }
+            lock.writeLock().lock();
+            try
+            {
+                // Replace old database with new
+                JdbmEntityStoreMixin.this.recordManager.close();
 
-                    boolean renamedTempDatabase = true;
-                    renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".db" ).renameTo( dbFile );
-                    renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".lg" ).renameTo( lgFile );
+                boolean deletedOldDatabase = true;
+                deletedOldDatabase &= dbFile.delete();
+                deletedOldDatabase &= lgFile.delete();
+                if( !deletedOldDatabase )
+                {
+                    throw new EntityStoreException( "Could not remove old database" );
+                }
 
-                    if( !renamedTempDatabase )
-                    {
-                        throw new IOException( "Could not replace database with temp database" );
-                    }
+                boolean renamedTempDatabase = true;
+                renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".db" ).renameTo( dbFile );
+                renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".lg" ).renameTo( lgFile );
 
-                    // Start up again
-                    initialize();
-                }
-                finally
+                if( !renamedTempDatabase )
                 {
-                    lock.writeLock().unlock();
+                    throw new EntityStoreException( "Could not replace database with temp database" );
                 }
+
+                // Start up again
+                initialize();
             }
-        };
+            finally
+            {
+                lock.writeLock().unlock();
+            }
+        }
+        catch( IOException ex )
+        {
+            tempDatabase.delete();
+            throw new EntityStoreException( ex );
+        }
     }
 
     private String getDatabaseName()

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java b/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java
index 638b021..7e485b2 100644
--- a/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java
+++ b/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java
@@ -26,6 +26,11 @@ import java.io.StringReader;
 import java.io.StringWriter;
 import java.io.Writer;
 import java.nio.charset.Charset;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 import org.apache.zest.api.configuration.Configuration;
 import org.apache.zest.api.entity.EntityDescriptor;
 import org.apache.zest.api.entity.EntityReference;
@@ -34,10 +39,6 @@ import org.apache.zest.api.injection.scope.This;
 import org.apache.zest.api.injection.scope.Uses;
 import org.apache.zest.api.service.ServiceActivation;
 import org.apache.zest.api.service.ServiceDescriptor;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.library.fileconfig.FileConfiguration;
 import org.apache.zest.spi.entitystore.EntityNotFoundException;
 import org.apache.zest.spi.entitystore.EntityStoreException;
@@ -197,42 +198,38 @@ public class LevelDBEntityStoreMixin
     }
 
     @Override
-    public Input<Reader, IOException> entityStates()
+    public Stream<Reader> entityStates()
     {
-        return new Input<Reader, IOException>()
-        {
-
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
+        DBIterator iterator = db.iterator();
+        iterator.seekToFirst();
+        return StreamSupport.stream(
+            new Spliterators.AbstractSpliterator<Reader>( Long.MAX_VALUE, Spliterator.ORDERED )
             {
-                output.receiveFrom( new Sender<Reader, IOException>()
+                @Override
+                public boolean tryAdvance( final Consumer<? super Reader> action )
                 {
-
-                    @Override
-                    public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
-                        throws ReceiverThrowableType, IOException
+                    if( !iterator.hasNext() )
                     {
-                        DBIterator iterator = db.iterator();
-                        try
-                        {
-                            for( iterator.seekToFirst(); iterator.hasNext(); iterator.next() )
-                            {
-                                byte[] state = iterator.peekNext().getValue();
-                                String jsonState = new String( state, charset );
-                                receiver.receive( new StringReader( jsonState ) );
-                            }
-                        }
-                        finally
-                        {
-                            iterator.close();
-                        }
+                        return false;
                     }
-
-                } );
+                    action.accept( new StringReader( new String( iterator.next().getValue(), charset ) ) );
+                    return true;
+                }
+            },
+            false
+        ).onClose(
+            () ->
+            {
+                try
+                {
+                    iterator.close();
+                }
+                catch( IOException ex )
+                {
+                    throw new EntityStoreException( "Unable to close DB iterator" );
+                }
             }
-
-        };
+        );
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java b/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java
index 9b988f5..ee97171 100644
--- a/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java
+++ b/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java
@@ -25,7 +25,6 @@ import com.mongodb.MongoClientOptions;
 import com.mongodb.MongoCredential;
 import com.mongodb.ServerAddress;
 import com.mongodb.WriteConcern;
-import com.mongodb.client.FindIterable;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoCursor;
 import com.mongodb.client.MongoDatabase;
@@ -40,15 +39,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 import org.apache.zest.api.configuration.Configuration;
 import org.apache.zest.api.entity.EntityDescriptor;
 import org.apache.zest.api.entity.EntityReference;
 import org.apache.zest.api.injection.scope.This;
 import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.spi.entitystore.EntityNotFoundException;
 import org.apache.zest.spi.entitystore.EntityStoreException;
 import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
@@ -289,33 +286,16 @@ public class MongoMapEntityStoreMixin
     }
 
     @Override
-    public Input<Reader, IOException> entityStates()
+    public Stream<Reader> entityStates()
     {
-        return new Input<Reader, IOException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo(
-                Output<? super Reader, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<Reader, IOException>()
-                {
-                    @Override
-                    public <ReceiverThrowableType extends Throwable> void sendTo(
-                        Receiver<? super Reader, ReceiverThrowableType> receiver )
-                        throws ReceiverThrowableType, IOException
-                    {
-                        FindIterable<Document> cursor = db.getCollection( collectionName ).find();
-                        for( Document eachEntity : cursor )
-                        {
-                            Document bsonState = (Document) eachEntity.get( STATE_COLUMN );
-                            String jsonState = JSON.serialize( bsonState );
-                            receiver.receive( new StringReader( jsonState ) );
-                        }
-                    }
-                } );
-            }
-        };
+        return StreamSupport
+            .stream( db.getCollection( collectionName ).find().spliterator(), false )
+            .map( eachEntity ->
+                  {
+                      Document bsonState = (Document) eachEntity.get( STATE_COLUMN );
+                      String jsonState = JSON.serialize( bsonState );
+                      return new StringReader( jsonState );
+                  } );
     }
 
     private Bson byIdentity( EntityReference entityReference )

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java b/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java
index f76f8d3..09918a2 100644
--- a/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java
+++ b/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.prefs.BackingStoreException;
 import java.util.prefs.Preferences;
+import java.util.stream.Stream;
 import org.apache.zest.api.cache.CacheOptions;
 import org.apache.zest.api.common.QualifiedName;
 import org.apache.zest.api.entity.EntityDescriptor;
@@ -57,10 +58,6 @@ import org.apache.zest.api.usecase.Usecase;
 import org.apache.zest.api.usecase.UsecaseBuilder;
 import org.apache.zest.api.value.ValueSerialization;
 import org.apache.zest.api.value.ValueSerializationException;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.spi.ZestSPI;
 import org.apache.zest.spi.entity.EntityState;
 import org.apache.zest.spi.entity.EntityStatus;
@@ -175,43 +172,23 @@ public class PreferencesEntityStoreMixin
     }
 
     @Override
-    public Input<EntityState, EntityStoreException> entityStates( final ModuleDescriptor module )
+    public Stream<EntityState> entityStates( final ModuleDescriptor module )
     {
-        return new Input<EntityState, EntityStoreException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super EntityState, ReceiverThrowableType> output )
-                throws EntityStoreException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<EntityState, EntityStoreException>()
-                {
-                    @Override
-                    public <RecThrowableType extends Throwable> void sendTo( Receiver<? super EntityState, RecThrowableType> receiver )
-                        throws RecThrowableType, EntityStoreException
-                    {
-                        UsecaseBuilder builder = UsecaseBuilder.buildUsecase( "zest.entitystore.preferences.visit" );
-                        Usecase visitUsecase = builder.withMetaInfo( CacheOptions.NEVER ).newUsecase();
-                        final EntityStoreUnitOfWork uow =
-                            newUnitOfWork( module, visitUsecase, SystemTime.now() );
+        UsecaseBuilder builder = UsecaseBuilder.buildUsecase( "zest.entitystore.preferences.visit" );
+        Usecase visitUsecase = builder.withMetaInfo( CacheOptions.NEVER ).newUsecase();
+        EntityStoreUnitOfWork uow = newUnitOfWork( module, visitUsecase, SystemTime.now() );
 
-                        try
-                        {
-                            String[] identities = root.childrenNames();
-                            for( String identity : identities )
-                            {
-                                EntityReference reference = EntityReference.parseEntityReference( identity );
-                                EntityState entityState = uow.entityStateOf( module, reference );
-                                receiver.receive( entityState );
-                            }
-                        }
-                        catch( BackingStoreException e )
-                        {
-                            throw new EntityStoreException( e );
-                        }
-                    }
-                } );
-            }
-        };
+        try
+        {
+            return Stream.of( root.childrenNames() )
+                         .map( EntityReference::parseEntityReference )
+                         .map( ref -> uow.entityStateOf( module, ref ) )
+                         .onClose( uow::discard );
+        }
+        catch( BackingStoreException e )
+        {
+            throw new EntityStoreException( e );
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java b/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java
index 4113e39..9bc2446 100644
--- a/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java
+++ b/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java
@@ -24,16 +24,12 @@ import java.io.Reader;
 import java.io.StringReader;
 import java.io.StringWriter;
 import java.io.Writer;
-import java.util.Set;
+import java.util.stream.Stream;
 import org.apache.zest.api.configuration.Configuration;
 import org.apache.zest.api.entity.EntityDescriptor;
 import org.apache.zest.api.entity.EntityReference;
 import org.apache.zest.api.injection.scope.This;
 import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
 import org.apache.zest.spi.entitystore.EntityAlreadyExistsException;
 import org.apache.zest.spi.entitystore.EntityNotFoundException;
 import org.apache.zest.spi.entitystore.EntityStoreException;
@@ -164,38 +160,12 @@ public class RedisMapEntityStoreMixin
     }
 
     @Override
-    public Input<Reader, IOException> entityStates()
+    public Stream<Reader> entityStates()
     {
-        return new Input<Reader, IOException>()
-        {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
-                throws IOException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<Reader, IOException>()
-                {
-                    @Override
-                    public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
-                        throws ReceiverThrowableType, IOException
-                    {
-                        Jedis jedis = pool.getResource();
-                        try
-                        {
-                            Set<String> keys = jedis.keys( "*" );
-                            for( String key : keys )
-                            {
-                                String jsonState = jedis.get( key );
-                                receiver.receive( new StringReader( jsonState ) );
-                            }
-                        }
-                        finally
-                        {
-                            pool.returnResource( jedis );
-                        }
-                    }
-                } );
-            }
-        };
+        Jedis jedis = pool.getResource();
+        return jedis.keys( "*" ).stream()
+                    .map( key -> (Reader) new StringReader( jedis.get( key ) ) )
+                    .onClose( jedis::close );
     }
 
     private static boolean notFound( String jsonState )

http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java b/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java
index 0160dfa..fcda8fa 100644
--- a/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java
+++ b/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java
@@ -28,20 +28,6 @@ import com.basho.riak.client.core.RiakNode;
 import com.basho.riak.client.core.query.Location;
 import com.basho.riak.client.core.query.Namespace;
 import com.basho.riak.client.core.util.HostAndPort;
-import org.apache.zest.api.common.InvalidApplicationException;
-import org.apache.zest.api.configuration.Configuration;
-import org.apache.zest.api.entity.EntityDescriptor;
-import org.apache.zest.api.entity.EntityReference;
-import org.apache.zest.api.injection.scope.This;
-import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
-import org.apache.zest.spi.entitystore.EntityNotFoundException;
-import org.apache.zest.spi.entitystore.EntityStoreException;
-import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
-
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -57,6 +43,17 @@ import java.security.Security;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.zest.api.common.InvalidApplicationException;
+import org.apache.zest.api.configuration.Configuration;
+import org.apache.zest.api.entity.EntityDescriptor;
+import org.apache.zest.api.entity.EntityReference;
+import org.apache.zest.api.injection.scope.This;
+import org.apache.zest.api.service.ServiceActivation;
+import org.apache.zest.spi.entitystore.EntityNotFoundException;
+import org.apache.zest.spi.entitystore.EntityStoreException;
+import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
 
 /**
  * Riak Protobuf implementation of MapEntityStore.
@@ -334,40 +331,33 @@ public class RiakMapEntityStoreMixin implements ServiceActivation, MapEntityStor
     }
 
     @Override
-    public Input<Reader, IOException> entityStates()
+    public Stream<Reader> entityStates()
     {
-        return new Input<Reader, IOException>()
+        try
         {
-            @Override
-            public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
-                    throws IOException, ReceiverThrowableType
-            {
-                output.receiveFrom( new Sender<Reader, IOException>()
-                {
-                    @Override
-                    public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
-                            throws ReceiverThrowableType, IOException
-                    {
-                        try
-                        {
-                            ListKeys listKeys = new ListKeys.Builder( namespace ).build();
-                            ListKeys.Response listKeysResponse = riakClient.execute( listKeys );
-                            for( Location location : listKeysResponse )
-                            {
-                                FetchValue fetch = new FetchValue.Builder( location ).build();
-                                FetchValue.Response response = riakClient.execute( fetch );
-                                String jsonState = response.getValue( String.class );
-                                receiver.receive( new StringReader( jsonState ) );
-                            }
-                        }
-                        catch( InterruptedException | ExecutionException ex )
-                        {
-                            throw new EntityStoreException( "Unable to apply entity changes.", ex );
-                        }
-                    }
-                } );
-            }
-        };
+            ListKeys listKeys = new ListKeys.Builder( namespace ).build();
+            ListKeys.Response listKeysResponse = riakClient.execute( listKeys );
+            return StreamSupport
+                .stream( listKeysResponse.spliterator(), false )
+                .map( location ->
+                      {
+                          try
+                          {
+                              FetchValue fetch = new FetchValue.Builder( location ).build();
+                              FetchValue.Response response = riakClient.execute( fetch );
+                              String jsonState = response.getValue( String.class );
+                              return new StringReader( jsonState );
+                          }
+                          catch( InterruptedException | ExecutionException ex )
+                          {
+                              throw new EntityStoreException( "Unable to get entity states.", ex );
+                          }
+                      } );
+        }
+        catch( InterruptedException | ExecutionException ex )
+        {
+            throw new EntityStoreException( "Unable to get entity states.", ex );
+        }
     }