You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@polygene.apache.org by pa...@apache.org on 2016/12/09 16:28:33 UTC
[2/7] zest-java git commit: entitystores: replace usage of core/io
with streams
entitystores: replace usage of core/io with streams
Project: http://git-wip-us.apache.org/repos/asf/zest-java/repo
Commit: http://git-wip-us.apache.org/repos/asf/zest-java/commit/ee1d1abc
Tree: http://git-wip-us.apache.org/repos/asf/zest-java/tree/ee1d1abc
Diff: http://git-wip-us.apache.org/repos/asf/zest-java/diff/ee1d1abc
Branch: refs/heads/develop
Commit: ee1d1abccdd5e978f6a5216f1265958825139925
Parents: 8854b13
Author: Paul Merlin <pa...@apache.org>
Authored: Sat Dec 3 12:12:22 2016 +0100
Committer: Paul Merlin <pa...@apache.org>
Committed: Fri Dec 9 09:26:40 2016 +0100
----------------------------------------------------------------------
.../memory/MemoryMapEntityStoreMixin.java | 107 ++----
.../zest/spi/entitystore/BackupRestore.java | 25 +-
.../zest/spi/entitystore/EntityStore.java | 10 +-
.../helpers/JSONMapEntityStoreMixin.java | 125 +++----
.../spi/entitystore/helpers/MapEntityStore.java | 6 +-
.../helpers/MapEntityStoreMixin.java | 112 +++---
.../test/entity/AbstractEntityStoreTest.java | 32 ++
.../entitystore/file/FileEntityStoreMixin.java | 98 ++----
.../geode/GeodeEntityStoreMixin.java | 29 +-
.../hazelcast/HazelcastEntityStoreMixin.java | 29 +-
.../jclouds/JCloudsMapEntityStoreMixin.java | 73 ++--
.../entitystore/jdbm/JdbmEntityStoreMixin.java | 293 ++++++++--------
.../leveldb/LevelDBEntityStoreMixin.java | 65 ++--
.../mongodb/MongoMapEntityStoreMixin.java | 42 +--
.../prefs/PreferencesEntityStoreMixin.java | 55 +--
.../redis/RedisMapEntityStoreMixin.java | 42 +--
.../riak/RiakMapEntityStoreMixin.java | 82 ++---
.../entitystore/sql/SQLEntityStoreMixin.java | 337 ++++++++++---------
.../apache/zest/migration/MigrationTest.java | 90 ++---
.../reindexer/internal/ReindexerMixin.java | 54 ++-
.../zest/library/logging/DebuggingTest.java | 31 +-
.../apache/zest/library/sql/common/SQLUtil.java | 78 ++++-
22 files changed, 739 insertions(+), 1076 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java b/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java
index 893e17a..79ad54d 100644
--- a/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java
+++ b/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java
@@ -23,18 +23,13 @@ import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
+import java.io.UncheckedIOException;
import java.io.Writer;
import java.util.HashMap;
import java.util.Map;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
+import java.util.stream.Stream;
import org.apache.zest.api.entity.EntityDescriptor;
import org.apache.zest.api.entity.EntityReference;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.spi.entitystore.BackupRestore;
import org.apache.zest.spi.entitystore.EntityAlreadyExistsException;
import org.apache.zest.spi.entitystore.EntityNotFoundException;
@@ -42,6 +37,9 @@ import org.apache.zest.spi.entitystore.EntityStoreException;
import org.apache.zest.spi.entitystore.helpers.JSONKeys;
import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
import org.apache.zest.spi.entitystore.helpers.MapEntityStoreActivation;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
/**
* In-memory implementation of MapEntityStore.
@@ -90,95 +88,34 @@ public class MemoryMapEntityStoreMixin
}
@Override
- public Input<Reader, IOException> entityStates()
+ public Stream<Reader> entityStates()
{
- return new Input<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, IOException
- {
- for( String state : store.values() )
- {
- receiver.receive( new StringReader( state ) );
- }
- }
- } );
- }
- };
+ return store.values().stream().map( StringReader::new );
}
@Override
- public Input<String, IOException> backup()
+ public Stream<String> backup()
{
- return new Input<String, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<String, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super String, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, IOException
- {
- for( String state : store.values() )
- {
- receiver.receive( state );
- }
- }
- } );
- }
- };
+ return store.values().stream();
}
@Override
- public Output<String, IOException> restore()
+ public void restore( final Stream<String> stream )
{
- return new Output<String, IOException>()
- {
- @Override
- public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender )
- throws IOException, SenderThrowableType
+ store.clear();
+ stream.forEach( item -> {
+ try
{
- store.clear();
-
- try
- {
- sender.sendTo( new Receiver<String, IOException>()
- {
- @Override
- public void receive( String item )
- throws IOException
- {
- try
- {
- JSONTokener tokener = new JSONTokener( item );
- JSONObject entity = (JSONObject) tokener.nextValue();
- String id = entity.getString( JSONKeys.IDENTITY );
- store.put( EntityReference.parseEntityReference( id ), item );
- }
- catch( JSONException e )
- {
- throw new IOException( e );
- }
- }
- } );
- }
- catch( IOException e )
- {
- store.clear();
- throw e;
- }
+ JSONTokener tokener = new JSONTokener( item );
+ JSONObject entity = (JSONObject) tokener.nextValue();
+ String id = entity.getString( JSONKeys.IDENTITY );
+ store.put( EntityReference.parseEntityReference( id ), item );
+ }
+ catch( JSONException e )
+ {
+ throw new UncheckedIOException( new IOException( e ) );
}
- };
+ } );
}
private class MemoryMapChanger
http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java
index e7cb5be..63aa3aa 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java
@@ -20,9 +20,8 @@
package org.apache.zest.spi.entitystore;
-import java.io.IOException;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
/**
* Allow backups and restores of data in an EntityStore to be made
@@ -30,16 +29,22 @@ import org.apache.zest.io.Output;
public interface BackupRestore
{
/**
- * Input that allows data from the entity store to be backed up.
- *
- * @return An Input instance containing the data to back up.
+ * Backup as a stream of serialized entity states, must be closed.
+ */
+ Stream<String> backup();
+
+ /**
+ * Restore from a stream of serialized entity states.
*/
- Input<String, IOException> backup();
+ void restore( Stream<String> states );
/**
- * Output that allows data to be restored from a backup.
+ * Restore from streams of serialized entity states.
*
- * @return An Output instance to receive the restored data.
+ * @return A consumer of streams of serialized entity states
*/
- Output<String, IOException> restore();
+ default Consumer<Stream<String>> restore()
+ {
+ return this::restore;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java
index 71752b0..2e002c2 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java
@@ -20,9 +20,9 @@
package org.apache.zest.spi.entitystore;
import java.time.Instant;
+import java.util.stream.Stream;
import org.apache.zest.api.structure.ModuleDescriptor;
import org.apache.zest.api.usecase.Usecase;
-import org.apache.zest.io.Input;
import org.apache.zest.spi.entity.EntityState;
/**
@@ -32,5 +32,11 @@ public interface EntityStore
{
EntityStoreUnitOfWork newUnitOfWork( ModuleDescriptor module, Usecase usecase, Instant currentTime );
- Input<EntityState, EntityStoreException> entityStates( ModuleDescriptor module );
+ /**
+ * Stream of all entity states, must be closed.
+ *
+ * @param module Module
+ * @return Stream of all entity states, must be closed
+ */
+ Stream<EntityState> entityStates( ModuleDescriptor module );
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java
index 90d5acb..ebaa2f2 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java
@@ -29,6 +29,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.stream.Stream;
import org.apache.zest.api.cache.CacheOptions;
import org.apache.zest.api.common.Optional;
import org.apache.zest.api.entity.EntityDescriptor;
@@ -47,10 +48,6 @@ import org.apache.zest.api.structure.ModuleDescriptor;
import org.apache.zest.api.unitofwork.NoSuchEntityTypeException;
import org.apache.zest.api.usecase.Usecase;
import org.apache.zest.api.value.ValueSerialization;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.spi.ZestSPI;
import org.apache.zest.spi.cache.Cache;
import org.apache.zest.spi.cache.CachePool;
@@ -310,102 +307,62 @@ public class JSONMapEntityStoreMixin
}
@Override
- public Input<EntityState, EntityStoreException> entityStates( final ModuleDescriptor module )
+ public Stream<EntityState> entityStates( ModuleDescriptor module )
{
- return new Input<EntityState, EntityStoreException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super EntityState, ReceiverThrowableType> output )
- throws EntityStoreException, ReceiverThrowableType
+ List<EntityState> migrated = new ArrayList<>();
+ return mapEntityStore.entityStates().map(
+ reader ->
{
- output.receiveFrom( new Sender<EntityState, EntityStoreException>()
+ EntityState entity = readEntityState( module, reader );
+ if( entity.status() == EntityStatus.UPDATED )
{
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super EntityState, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, EntityStoreException
+ migrated.add( entity );
+ // Synch back 100 at a time
+ if( migrated.size() > 100 )
{
- final List<EntityState> migrated = new ArrayList<>();
- try
- {
- mapEntityStore.entityStates().transferTo( new Output<Reader, ReceiverThrowableType>()
- {
- @Override
- public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends Reader, SenderThrowableType> sender )
- throws ReceiverThrowableType, SenderThrowableType
- {
- sender.sendTo( new Receiver<Reader, ReceiverThrowableType>()
- {
- @Override
- public void receive( Reader item )
- throws ReceiverThrowableType
- {
- final EntityState entity = readEntityState( module, item );
- if( entity.status() == EntityStatus.UPDATED )
- {
- migrated.add( entity );
-
- // Synch back 100 at a time
- if( migrated.size() > 100 )
- {
- try
- {
- synchMigratedEntities( migrated );
- }
- catch( IOException e )
- {
- throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
- }
- }
- }
- receiver.receive( entity );
- }
- } );
-
- // Synch any remaining migrated entities
- if( !migrated.isEmpty() )
- {
- try
- {
- synchMigratedEntities( migrated );
- }
- catch( IOException e )
- {
- throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
- }
- }
- }
- } );
- }
- catch( IOException e )
- {
- throw new EntityStoreException( e );
- }
+ synchMigratedEntities( migrated );
}
- } );
+ }
+ return entity;
}
- };
+ ).onClose(
+ () ->
+ {
+ // Synch any remaining migrated entities
+ if( !migrated.isEmpty() )
+ {
+ synchMigratedEntities( migrated );
+ }
+ }
+ );
}
private void synchMigratedEntities( final List<EntityState> migratedEntities )
- throws IOException
{
- mapEntityStore.applyChanges( new MapEntityStore.MapChanges()
+ try
{
- @Override
- public void visitMap( MapEntityStore.MapChanger changer )
- throws IOException
+ mapEntityStore.applyChanges( new MapEntityStore.MapChanges()
{
- for( EntityState migratedEntity : migratedEntities )
+ @Override
+ public void visitMap( MapEntityStore.MapChanger changer )
+ throws IOException
{
- JSONEntityState state = (JSONEntityState) migratedEntity;
- try (Writer writer = changer.updateEntity( state.entityReference(), state.entityDescriptor() ))
+ for( EntityState migratedEntity : migratedEntities )
{
- writeEntityState( state, writer, state.version(), state.lastModified() );
+ JSONEntityState state = (JSONEntityState) migratedEntity;
+ try( Writer writer = changer.updateEntity( state.entityReference(), state.entityDescriptor() ) )
+ {
+ writeEntityState( state, writer, state.version(), state.lastModified() );
+ }
}
}
- }
- } );
- migratedEntities.clear();
+ } );
+ migratedEntities.clear();
+ }
+ catch( IOException ex )
+ {
+ throw new EntityStoreException( "Synchronization of Migrated Entities failed.", ex );
+ }
}
protected Identity newUnitOfWorkId()
http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java
index 2ec20a0..435494f 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java
@@ -22,9 +22,9 @@ package org.apache.zest.spi.entitystore.helpers;
import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
+import java.util.stream.Stream;
import org.apache.zest.api.entity.EntityDescriptor;
import org.apache.zest.api.entity.EntityReference;
-import org.apache.zest.io.Input;
import org.apache.zest.spi.entitystore.EntityNotFoundException;
import org.apache.zest.spi.entitystore.EntityStoreException;
@@ -42,9 +42,9 @@ public interface MapEntityStore
throws EntityStoreException;
/**
- * @return All entities state Readers
+ * @return All entities state Readers, must be closed
*/
- Input<Reader, IOException> entityStates();
+ Stream<Reader> entityStates();
void applyChanges( MapChanges changes )
throws IOException;
http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java
index fceae54..8a61ced 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Stream;
import org.apache.zest.api.common.Optional;
import org.apache.zest.api.common.QualifiedName;
import org.apache.zest.api.entity.EntityDescriptor;
@@ -46,10 +47,6 @@ import org.apache.zest.api.unitofwork.NoSuchEntityTypeException;
import org.apache.zest.api.usecase.Usecase;
import org.apache.zest.api.value.ValueSerialization;
import org.apache.zest.api.value.ValueSerializationException;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.spi.ZestSPI;
import org.apache.zest.spi.entity.EntityState;
import org.apache.zest.spi.entity.EntityStatus;
@@ -200,74 +197,47 @@ public class MapEntityStoreMixin
}
@Override
- public Input<EntityState, EntityStoreException> entityStates( final ModuleDescriptor module )
+ public Stream<EntityState> entityStates( final ModuleDescriptor module )
{
- return new Input<EntityState, EntityStoreException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super EntityState, ReceiverThrowableType> output )
- throws EntityStoreException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<EntityState, EntityStoreException>()
- {
- @Override
- public <RecThrowableType extends Throwable> void sendTo( final Receiver<? super EntityState, RecThrowableType> receiver )
- throws RecThrowableType, EntityStoreException
- {
- final List<EntityState> migrated = new ArrayList<>();
- try
- {
- mapEntityStore.entityStates().transferTo( new Output<Reader, RecThrowableType>()
- {
- @Override
- public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends Reader, SenderThrowableType> sender )
- throws RecThrowableType, SenderThrowableType
- {
- sender.sendTo( item -> {
- final EntityState entity = readEntityState( module, item );
- if( entity.status() == EntityStatus.UPDATED )
- {
- migrated.add( entity );
-
- // Synch back 100 at a time
- if( migrated.size() > 100 )
- {
- try
- {
- synchMigratedEntities( migrated );
- }
- catch( IOException e )
- {
- throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
- }
- }
- }
- receiver.receive( entity );
- } );
-
- // Synch any remaining migrated entities
- if( !migrated.isEmpty() )
- {
- try
- {
- synchMigratedEntities( migrated );
- }
- catch( IOException e )
- {
- throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
- }
- }
- }
- } );
- }
- catch( IOException e )
- {
- throw new EntityStoreException( e );
- }
- }
- } );
- }
- };
+ List<EntityState> migrated = new ArrayList<>();
+ return mapEntityStore
+ .entityStates()
+ .map( reader ->
+ {
+ EntityState entity = readEntityState( module, reader );
+ if( entity.status() == EntityStatus.UPDATED )
+ {
+ migrated.add( entity );
+ // Synch back 100 at a time
+ if( migrated.size() > 100 )
+ {
+ try
+ {
+ synchMigratedEntities( migrated );
+ }
+ catch( IOException e )
+ {
+ throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
+ }
+ }
+ }
+ return entity;
+ } )
+ .onClose( () ->
+ {
+ // Synch any remaining migrated entities
+ if( !migrated.isEmpty() )
+ {
+ try
+ {
+ synchMigratedEntities( migrated );
+ }
+ catch( IOException e )
+ {
+ throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
+ }
+ }
+ } );
}
private void synchMigratedEntities( final List<EntityState> migratedEntities )
http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java
----------------------------------------------------------------------
diff --git a/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java b/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java
index ab05f39..74368ae 100644
--- a/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java
+++ b/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java
@@ -31,6 +31,7 @@ import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Stream;
import org.apache.zest.api.association.Association;
import org.apache.zest.api.association.ManyAssociation;
import org.apache.zest.api.association.NamedAssociation;
@@ -51,6 +52,7 @@ import org.apache.zest.api.value.ValueBuilder;
import org.apache.zest.api.value.ValueComposite;
import org.apache.zest.bootstrap.AssemblyException;
import org.apache.zest.bootstrap.ModuleAssembly;
+import org.apache.zest.spi.entity.EntityState;
import org.apache.zest.spi.entitystore.EntityStore;
import org.apache.zest.test.AbstractZestTest;
import org.junit.After;
@@ -501,6 +503,36 @@ public abstract class AbstractEntityStoreTest
}
}
+ @Test
+ public void entityStatesSPI()
+ {
+ EntityStore entityStore = serviceFinder.findService( EntityStore.class ).get();
+
+ try( Stream<EntityState> states = entityStore.entityStates( module ) )
+ {
+ assertThat( states.count(), is( 0L ) );
+ }
+
+ UnitOfWork unitOfWork = unitOfWorkFactory.newUnitOfWork();
+ TestEntity newInstance = createEntity( unitOfWork );
+ unitOfWork.complete();
+
+ try( Stream<EntityState> states = entityStore.entityStates( module ) )
+ {
+ assertThat( states.count(), is( 1L ) );
+ }
+
+ unitOfWork = unitOfWorkFactory.newUnitOfWork();
+ TestEntity instance = unitOfWork.get( newInstance );
+ unitOfWork.remove( instance );
+ unitOfWork.complete();
+
+ try( Stream<EntityState> states = entityStore.entityStates( module ) )
+ {
+ assertThat( states.count(), is( 0L ) );
+ }
+ }
+
public interface TestEntity
extends EntityComposite
{
http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java b/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java
index 1ab1c53..500f313 100644
--- a/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java
+++ b/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java
@@ -29,16 +29,13 @@ import java.io.Writer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
+import java.util.stream.Stream;
import org.apache.zest.api.common.Optional;
import org.apache.zest.api.configuration.Configuration;
import org.apache.zest.api.entity.EntityDescriptor;
import org.apache.zest.api.entity.EntityReference;
import org.apache.zest.api.injection.scope.Service;
import org.apache.zest.api.injection.scope.This;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.library.fileconfig.FileConfiguration;
import org.apache.zest.spi.entitystore.BackupRestore;
import org.apache.zest.spi.entitystore.EntityAlreadyExistsException;
@@ -231,84 +228,43 @@ public class FileEntityStoreMixin
}
@Override
- public Input<String, IOException> backup()
+ public Stream<Reader> entityStates()
{
- return new Input<String, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<String, IOException>()
- {
- @Override
- public <ThrowableType extends Throwable> void sendTo( Receiver<? super String, ThrowableType> receiver )
- throws ThrowableType, IOException
- {
- for( File sliceDirectory : dataDirectory.listFiles() )
- {
- for( File file : sliceDirectory.listFiles() )
- {
- receiver.receive( fetch( file ) );
- }
- }
- }
- } );
- }
- };
+ return backup().map( StringReader::new );
}
@Override
- public Output<String, IOException> restore()
+ public Stream<String> backup()
{
- return new Output<String, IOException>()
+ if( !dataDirectory.exists() )
{
- @Override
- public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender )
- throws IOException, SenderThrowableType
- {
- sender.sendTo( new Receiver<String, IOException>()
- {
- @Override
- public void receive( String item )
- throws IOException
- {
- String id = item.substring( "{\"reference\":\"".length() );
- id = id.substring( 0, id.indexOf( '"' ) );
- store( getDataFile( id ), item );
- }
- } );
- }
- };
+ return Stream.of();
+ }
+ try
+ {
+ return java.nio.file.Files.walk( dataDirectory.toPath(), 3 )
+ .skip( 1 )
+ .filter( path -> !"slices".equals( path.getFileName().toString() ) )
+ .map( Path::toFile )
+ .filter( file -> !file.isDirectory() )
+ .map( this::uncheckedFetch );
+ }
+ catch( IOException ex )
+ {
+ throw new EntityStoreException( ex );
+ }
}
@Override
- public Input<Reader, IOException> entityStates()
+ public void restore( final Stream<String> stream )
{
- return new Input<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
+ stream.forEach(
+ item ->
{
- output.receiveFrom( new Sender<Reader, IOException>()
- {
- @Override
- public <ThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ThrowableType> receiver )
- throws ThrowableType, IOException
- {
- for( File sliceDirectory : dataDirectory.listFiles() )
- {
- for( File file : sliceDirectory.listFiles() )
- {
- String state = fetch( file );
- receiver.receive( new StringReader( state ) );
- }
- }
- }
- } );
- }
- };
+ String id = item.substring( "{\"reference\":\"".length() );
+ id = id.substring( 0, id.indexOf( '"' ) );
+ uncheckedStore( getDataFile( id ), item );
+ } );
}
private File getDataFile( String identity )
http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java b/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java
index fca8a6b..25a58b7 100644
--- a/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java
+++ b/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java
@@ -23,8 +23,8 @@ import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
-import java.util.Map;
import java.util.Properties;
+import java.util.stream.Stream;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
@@ -39,10 +39,6 @@ import org.apache.zest.api.entity.EntityDescriptor;
import org.apache.zest.api.entity.EntityReference;
import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.spi.entitystore.EntityNotFoundException;
import org.apache.zest.spi.entitystore.EntityStoreException;
import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
@@ -197,27 +193,8 @@ public class GeodeEntityStoreMixin
}
@Override
- public Input<Reader, IOException> entityStates()
+ public Stream<Reader> entityStates()
{
- return new Input<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<Reader, IOException>()
- {
- @Override
- public <RTT extends Throwable> void sendTo( Receiver<? super Reader, RTT> receiver )
- throws RTT, IOException
- {
- for( Map.Entry<String, String> eachEntry : region.entrySet() )
- {
- receiver.receive( new StringReader( eachEntry.getValue() ) );
- }
- }
- } );
- }
- };
+ return region.values().stream().map( StringReader::new );
}
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java b/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java
index f588862..34386f2 100644
--- a/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java
+++ b/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java
@@ -30,16 +30,12 @@ import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
-import java.util.Map;
+import java.util.stream.Stream;
import org.apache.zest.api.configuration.Configuration;
import org.apache.zest.api.entity.EntityDescriptor;
import org.apache.zest.api.entity.EntityReference;
import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.spi.entitystore.EntityNotFoundException;
import org.apache.zest.spi.entitystore.EntityStoreException;
import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
@@ -147,28 +143,9 @@ public class HazelcastEntityStoreMixin
}
@Override
- public Input<Reader, IOException> entityStates()
+ public Stream<Reader> entityStates()
{
- return new Input<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<Reader, IOException>()
- {
- @Override
- public <RTT extends Throwable> void sendTo( Receiver<? super Reader, RTT> receiver )
- throws RTT, IOException
- {
- for( Map.Entry<String, String> eachEntry : stringMap.entrySet() )
- {
- receiver.receive( new StringReader( eachEntry.getValue() ) );
- }
- }
- } );
- }
- };
+ return stringMap.values().stream().map( StringReader::new );
}
private Config createConfig( HazelcastConfiguration configuration )
http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java b/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java
index f8c2ac1..b895177 100644
--- a/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java
+++ b/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java
@@ -26,7 +26,6 @@ import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import java.io.IOException;
import java.io.InputStream;
-import java.io.InputStreamReader;
import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
@@ -36,15 +35,12 @@ import java.util.Map;
import java.util.Properties;
import java.util.Scanner;
import java.util.Set;
+import java.util.stream.Stream;
import org.apache.zest.api.configuration.Configuration;
import org.apache.zest.api.entity.EntityDescriptor;
import org.apache.zest.api.entity.EntityReference;
import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.spi.entitystore.EntityNotFoundException;
import org.apache.zest.spi.entitystore.EntityStoreException;
import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
@@ -54,7 +50,6 @@ import org.jclouds.apis.Apis;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
-import org.jclouds.blobstore.domain.StorageMetadata;
import org.jclouds.io.Payload;
import org.jclouds.providers.ProviderMetadata;
import org.jclouds.providers.Providers;
@@ -254,52 +249,26 @@ public class JCloudsMapEntityStoreMixin
}
@Override
- public Input<Reader, IOException> entityStates()
+ public Stream<Reader> entityStates()
{
- return new Input<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- output.receiveFrom(
- new Sender<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, IOException
- {
- for( StorageMetadata stored : storeContext.getBlobStore().list() )
- {
- Payload payload = storeContext.getBlobStore().getBlob( container, stored.getName() ).getPayload();
- if( payload == null )
- {
- throw new EntityNotFoundException( EntityReference.parseEntityReference( stored.getName() ) );
- }
- InputStream input = null;
- try
- {
- input = payload.openStream();
- receiver.receive( new InputStreamReader( input, "UTF-8" ) );
- }
- finally
- {
- if( input != null )
- {
- try
- {
- input.close();
- }
- catch( IOException ignored )
- {
- }
- }
- }
- }
- }
- }
- );
- }
- };
+ return storeContext
+ .getBlobStore().list( container ).stream()
+ .map( metadata ->
+ {
+ Payload payload = storeContext.getBlobStore().getBlob( container, metadata.getName() ).getPayload();
+ if( payload == null )
+ {
+ throw new EntityNotFoundException( EntityReference.parseEntityReference( metadata.getName() ) );
+ }
+ try( InputStream input = payload.openStream() )
+ {
+ String state = new Scanner( input, UTF_8.name() ).useDelimiter( "\\Z" ).next();
+ return (Reader) new StringReader( state );
+ }
+ catch( IOException ex )
+ {
+ throw new EntityStoreException( ex );
+ }
+ } );
}
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java b/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java
index fe3a9cf..19c2b7d 100644
--- a/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java
+++ b/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java
@@ -24,9 +24,16 @@ import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
+import java.io.UncheckedIOException;
import java.io.Writer;
import java.util.Properties;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import jdbm.RecordManager;
import jdbm.RecordManagerFactory;
import jdbm.RecordManagerOptions;
@@ -48,10 +55,6 @@ import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.injection.scope.Uses;
import org.apache.zest.api.service.ServiceDescriptor;
import org.apache.zest.io.Files;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.library.fileconfig.FileConfiguration;
import org.apache.zest.library.locking.ReadLock;
import org.apache.zest.library.locking.WriteLock;
@@ -213,198 +216,166 @@ public class JdbmEntityStoreMixin
}
@Override
- public Input<Reader, IOException> entityStates()
+ public Stream<Reader> entityStates()
{
- return new Input<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- lock.writeLock().lock();
-
- try
- {
- output.receiveFrom( new Sender<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, IOException
- {
- final TupleBrowser browser = index.browse();
- final Tuple tuple = new Tuple();
-
- while( browser.getNext( tuple ) )
- {
- Identity id = new StringIdentity( (byte[]) tuple.getKey() );
-
- Long stateIndex = getStateIndex( id );
-
- if( stateIndex == null )
- {
- continue;
- } // Skip this one
-
- byte[] serializedState = (byte[]) recordManager.fetch( stateIndex, serializer );
-
- receiver.receive( new StringReader( new String( serializedState, "UTF-8" ) ) );
- }
- }
- } );
- }
- finally
- {
- lock.writeLock().unlock();
- }
- }
- };
+ return backup().map( StringReader::new );
}
@Override
- public Input<String, IOException> backup()
+ public Stream<String> backup()
{
- return new Input<String, IOException>()
+ lock.writeLock().lock();
+ TupleBrowser browser;
+ try
{
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
+ browser = index.browse();
+ }
+ catch( IOException ex )
+ {
+ lock.writeLock().unlock();
+ throw new EntityStoreException( ex );
+ }
+ return StreamSupport.stream(
+ new Spliterators.AbstractSpliterator<String>( Long.MAX_VALUE, Spliterator.ORDERED )
{
- lock.readLock().lock();
+ private final Tuple tuple = new Tuple();
- try
+ @Override
+ public boolean tryAdvance( final Consumer<? super String> action )
{
- output.receiveFrom( new Sender<String, IOException>()
+ try
{
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super String, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, IOException
+ if( !browser.getNext( tuple ) )
{
- final TupleBrowser browser = index.browse();
- final Tuple tuple = new Tuple();
-
- while( browser.getNext( tuple ) )
- {
- String id = new String( (byte[]) tuple.getKey(), "UTF-8" );
-
- Long stateIndex = getStateIndex( new StringIdentity( id ) );
-
- if( stateIndex == null )
- {
- continue;
- } // Skip this one
-
- byte[] serializedState = (byte[]) recordManager.fetch( stateIndex, serializer );
-
- receiver.receive( new String( serializedState, "UTF-8" ) );
- }
+ return false;
}
- } );
- }
- finally
- {
- lock.readLock().unlock();
+ Identity identity = new StringIdentity( (byte[]) tuple.getKey() );
+ Long stateIndex = getStateIndex( identity );
+ if( stateIndex == null )
+ {
+ return false;
+ }
+ byte[] serializedState = (byte[]) recordManager.fetch( stateIndex, serializer );
+ String state = new String( serializedState, "UTF-8" );
+ action.accept( state );
+ return true;
+ }
+ catch( IOException ex )
+ {
+ lock.writeLock().unlock();
+ throw new EntityStoreException( ex );
+ }
}
- }
- };
+ },
+ false
+ ).onClose( () -> lock.writeLock().unlock() );
}
@Override
- public Output<String, IOException> restore()
+ public void restore( final Stream<String> states )
{
- return new Output<String, IOException>()
+ File dbFile = new File( getDatabaseName() + ".db" );
+ File lgFile = new File( getDatabaseName() + ".lg" );
+
+ // Create temporary store
+ File tempDatabase = Files.createTemporayFileOf( dbFile );
+ final RecordManager recordManager;
+ final BTree index;
+ try
+ {
+ recordManager = RecordManagerFactory.createRecordManager( tempDatabase.getAbsolutePath(),
+ new Properties() );
+ ByteArrayComparator comparator = new ByteArrayComparator();
+ index = BTree.createInstance( recordManager, comparator, serializer, DefaultSerializer.INSTANCE, 16 );
+ recordManager.setNamedObject( "index", index.getRecid() );
+ recordManager.commit();
+ }
+ catch( IOException ex )
+ {
+ throw new EntityStoreException( ex );
+ }
+ try
{
- @Override
- public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender )
- throws IOException, SenderThrowableType
+ // TODO NO NEED TO SYNCHRONIZE HERE
+ AtomicLong counter = new AtomicLong();
+ Consumer<String> stateConsumer = state ->
{
- File dbFile = new File( getDatabaseName() + ".db" );
- File lgFile = new File( getDatabaseName() + ".lg" );
-
- // Create temporary store
- File tempDatabase = Files.createTemporayFileOf( dbFile );
-
- final RecordManager recordManager = RecordManagerFactory.createRecordManager( tempDatabase.getAbsolutePath(), new Properties() );
- ByteArrayComparator comparator = new ByteArrayComparator();
- final BTree index = BTree.createInstance( recordManager, comparator, serializer, DefaultSerializer.INSTANCE, 16 );
- recordManager.setNamedObject( "index", index.getRecid() );
- recordManager.commit();
-
try
{
- sender.sendTo( new Receiver<String, IOException>()
+ // Commit one batch
+ if( ( counter.incrementAndGet() % 1000 ) == 0 )
{
- int counter = 0;
-
- @Override
- public void receive( String item )
- throws IOException
- {
- // Commit one batch
- if( ( counter++ % 1000 ) == 0 )
- {
- recordManager.commit();
- }
+ recordManager.commit();
+ }
- String id = item.substring( "{\"reference\":\"".length() );
- id = id.substring( 0, id.indexOf( '"' ) );
+ String id = state.substring( "{\"reference\":\"".length() );
+ id = id.substring( 0, id.indexOf( '"' ) );
- // Insert
- byte[] stateArray = item.getBytes( "UTF-8" );
- long stateIndex = recordManager.insert( stateArray, serializer );
- index.insert( id.getBytes( "UTF-8" ), stateIndex, false );
- }
- } );
+ // Insert
+ byte[] stateArray = state.getBytes( "UTF-8" );
+ long stateIndex = recordManager.insert( stateArray, serializer );
+ index.insert( id.getBytes( "UTF-8" ), stateIndex, false );
}
- catch( IOException e )
+ catch( IOException ex )
{
- recordManager.close();
- tempDatabase.delete();
- throw e;
+ throw new UncheckedIOException( ex );
}
- catch( Throwable senderThrowableType )
- {
- recordManager.close();
- tempDatabase.delete();
- throw (SenderThrowableType) senderThrowableType;
- }
-
- // Import went ok - continue
- recordManager.commit();
- // close file handles otherwise Microsoft Windows will fail to rename database files.
+ };
+ states.forEach( stateConsumer );
+ // Import went ok - continue
+ recordManager.commit();
+ // close file handles otherwise Microsoft Windows will fail to rename database files.
+ recordManager.close();
+ }
+ catch( IOException | UncheckedIOException ex )
+ {
+ try
+ {
recordManager.close();
+ }
+ catch( IOException ignore ) { }
+ tempDatabase.delete();
+ throw new EntityStoreException( ex );
+ }
+ try
+ {
- lock.writeLock().lock();
- try
- {
- // Replace old database with new
- JdbmEntityStoreMixin.this.recordManager.close();
-
- boolean deletedOldDatabase = true;
- deletedOldDatabase &= dbFile.delete();
- deletedOldDatabase &= lgFile.delete();
- if( !deletedOldDatabase )
- {
- throw new IOException( "Could not remove old database" );
- }
+ lock.writeLock().lock();
+ try
+ {
+ // Replace old database with new
+ JdbmEntityStoreMixin.this.recordManager.close();
- boolean renamedTempDatabase = true;
- renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".db" ).renameTo( dbFile );
- renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".lg" ).renameTo( lgFile );
+ boolean deletedOldDatabase = true;
+ deletedOldDatabase &= dbFile.delete();
+ deletedOldDatabase &= lgFile.delete();
+ if( !deletedOldDatabase )
+ {
+ throw new EntityStoreException( "Could not remove old database" );
+ }
- if( !renamedTempDatabase )
- {
- throw new IOException( "Could not replace database with temp database" );
- }
+ boolean renamedTempDatabase = true;
+ renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".db" ).renameTo( dbFile );
+ renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".lg" ).renameTo( lgFile );
- // Start up again
- initialize();
- }
- finally
+ if( !renamedTempDatabase )
{
- lock.writeLock().unlock();
+ throw new EntityStoreException( "Could not replace database with temp database" );
}
+
+ // Start up again
+ initialize();
}
- };
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+ catch( IOException ex )
+ {
+ tempDatabase.delete();
+ throw new EntityStoreException( ex );
+ }
}
private String getDatabaseName()
http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java b/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java
index 638b021..7e485b2 100644
--- a/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java
+++ b/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java
@@ -26,6 +26,11 @@ import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
import java.nio.charset.Charset;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import org.apache.zest.api.configuration.Configuration;
import org.apache.zest.api.entity.EntityDescriptor;
import org.apache.zest.api.entity.EntityReference;
@@ -34,10 +39,6 @@ import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.injection.scope.Uses;
import org.apache.zest.api.service.ServiceActivation;
import org.apache.zest.api.service.ServiceDescriptor;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.library.fileconfig.FileConfiguration;
import org.apache.zest.spi.entitystore.EntityNotFoundException;
import org.apache.zest.spi.entitystore.EntityStoreException;
@@ -197,42 +198,38 @@ public class LevelDBEntityStoreMixin
}
@Override
- public Input<Reader, IOException> entityStates()
+ public Stream<Reader> entityStates()
{
- return new Input<Reader, IOException>()
- {
-
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
+ DBIterator iterator = db.iterator();
+ iterator.seekToFirst();
+ return StreamSupport.stream(
+ new Spliterators.AbstractSpliterator<Reader>( Long.MAX_VALUE, Spliterator.ORDERED )
{
- output.receiveFrom( new Sender<Reader, IOException>()
+ @Override
+ public boolean tryAdvance( final Consumer<? super Reader> action )
{
-
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, IOException
+ if( !iterator.hasNext() )
{
- DBIterator iterator = db.iterator();
- try
- {
- for( iterator.seekToFirst(); iterator.hasNext(); iterator.next() )
- {
- byte[] state = iterator.peekNext().getValue();
- String jsonState = new String( state, charset );
- receiver.receive( new StringReader( jsonState ) );
- }
- }
- finally
- {
- iterator.close();
- }
+ return false;
}
-
- } );
+ action.accept( new StringReader( new String( iterator.next().getValue(), charset ) ) );
+ return true;
+ }
+ },
+ false
+ ).onClose(
+ () ->
+ {
+ try
+ {
+ iterator.close();
+ }
+ catch( IOException ex )
+ {
+ throw new EntityStoreException( "Unable to close DB iterator" );
+ }
}
-
- };
+ );
}
@Override
http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java b/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java
index 9b988f5..ee97171 100644
--- a/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java
+++ b/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java
@@ -25,7 +25,6 @@ import com.mongodb.MongoClientOptions;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.WriteConcern;
-import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
@@ -40,15 +39,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import org.apache.zest.api.configuration.Configuration;
import org.apache.zest.api.entity.EntityDescriptor;
import org.apache.zest.api.entity.EntityReference;
import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.spi.entitystore.EntityNotFoundException;
import org.apache.zest.spi.entitystore.EntityStoreException;
import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
@@ -289,33 +286,16 @@ public class MongoMapEntityStoreMixin
}
@Override
- public Input<Reader, IOException> entityStates()
+ public Stream<Reader> entityStates()
{
- return new Input<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo(
- Output<? super Reader, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo(
- Receiver<? super Reader, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, IOException
- {
- FindIterable<Document> cursor = db.getCollection( collectionName ).find();
- for( Document eachEntity : cursor )
- {
- Document bsonState = (Document) eachEntity.get( STATE_COLUMN );
- String jsonState = JSON.serialize( bsonState );
- receiver.receive( new StringReader( jsonState ) );
- }
- }
- } );
- }
- };
+ return StreamSupport
+ .stream( db.getCollection( collectionName ).find().spliterator(), false )
+ .map( eachEntity ->
+ {
+ Document bsonState = (Document) eachEntity.get( STATE_COLUMN );
+ String jsonState = JSON.serialize( bsonState );
+ return new StringReader( jsonState );
+ } );
}
private Bson byIdentity( EntityReference entityReference )
http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java b/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java
index f76f8d3..09918a2 100644
--- a/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java
+++ b/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.prefs.BackingStoreException;
import java.util.prefs.Preferences;
+import java.util.stream.Stream;
import org.apache.zest.api.cache.CacheOptions;
import org.apache.zest.api.common.QualifiedName;
import org.apache.zest.api.entity.EntityDescriptor;
@@ -57,10 +58,6 @@ import org.apache.zest.api.usecase.Usecase;
import org.apache.zest.api.usecase.UsecaseBuilder;
import org.apache.zest.api.value.ValueSerialization;
import org.apache.zest.api.value.ValueSerializationException;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.spi.ZestSPI;
import org.apache.zest.spi.entity.EntityState;
import org.apache.zest.spi.entity.EntityStatus;
@@ -175,43 +172,23 @@ public class PreferencesEntityStoreMixin
}
@Override
- public Input<EntityState, EntityStoreException> entityStates( final ModuleDescriptor module )
+ public Stream<EntityState> entityStates( final ModuleDescriptor module )
{
- return new Input<EntityState, EntityStoreException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super EntityState, ReceiverThrowableType> output )
- throws EntityStoreException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<EntityState, EntityStoreException>()
- {
- @Override
- public <RecThrowableType extends Throwable> void sendTo( Receiver<? super EntityState, RecThrowableType> receiver )
- throws RecThrowableType, EntityStoreException
- {
- UsecaseBuilder builder = UsecaseBuilder.buildUsecase( "zest.entitystore.preferences.visit" );
- Usecase visitUsecase = builder.withMetaInfo( CacheOptions.NEVER ).newUsecase();
- final EntityStoreUnitOfWork uow =
- newUnitOfWork( module, visitUsecase, SystemTime.now() );
+ UsecaseBuilder builder = UsecaseBuilder.buildUsecase( "zest.entitystore.preferences.visit" );
+ Usecase visitUsecase = builder.withMetaInfo( CacheOptions.NEVER ).newUsecase();
+ EntityStoreUnitOfWork uow = newUnitOfWork( module, visitUsecase, SystemTime.now() );
- try
- {
- String[] identities = root.childrenNames();
- for( String identity : identities )
- {
- EntityReference reference = EntityReference.parseEntityReference( identity );
- EntityState entityState = uow.entityStateOf( module, reference );
- receiver.receive( entityState );
- }
- }
- catch( BackingStoreException e )
- {
- throw new EntityStoreException( e );
- }
- }
- } );
- }
- };
+ try
+ {
+ return Stream.of( root.childrenNames() )
+ .map( EntityReference::parseEntityReference )
+ .map( ref -> uow.entityStateOf( module, ref ) )
+ .onClose( uow::discard );
+ }
+ catch( BackingStoreException e )
+ {
+ throw new EntityStoreException( e );
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java b/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java
index 4113e39..9bc2446 100644
--- a/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java
+++ b/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java
@@ -24,16 +24,12 @@ import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
-import java.util.Set;
+import java.util.stream.Stream;
import org.apache.zest.api.configuration.Configuration;
import org.apache.zest.api.entity.EntityDescriptor;
import org.apache.zest.api.entity.EntityReference;
import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.spi.entitystore.EntityAlreadyExistsException;
import org.apache.zest.spi.entitystore.EntityNotFoundException;
import org.apache.zest.spi.entitystore.EntityStoreException;
@@ -164,38 +160,12 @@ public class RedisMapEntityStoreMixin
}
@Override
- public Input<Reader, IOException> entityStates()
+ public Stream<Reader> entityStates()
{
- return new Input<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, IOException
- {
- Jedis jedis = pool.getResource();
- try
- {
- Set<String> keys = jedis.keys( "*" );
- for( String key : keys )
- {
- String jsonState = jedis.get( key );
- receiver.receive( new StringReader( jsonState ) );
- }
- }
- finally
- {
- pool.returnResource( jedis );
- }
- }
- } );
- }
- };
+ Jedis jedis = pool.getResource();
+ return jedis.keys( "*" ).stream()
+ .map( key -> (Reader) new StringReader( jedis.get( key ) ) )
+ .onClose( jedis::close );
}
private static boolean notFound( String jsonState )
http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java b/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java
index 0160dfa..fcda8fa 100644
--- a/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java
+++ b/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java
@@ -28,20 +28,6 @@ import com.basho.riak.client.core.RiakNode;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import com.basho.riak.client.core.util.HostAndPort;
-import org.apache.zest.api.common.InvalidApplicationException;
-import org.apache.zest.api.configuration.Configuration;
-import org.apache.zest.api.entity.EntityDescriptor;
-import org.apache.zest.api.entity.EntityReference;
-import org.apache.zest.api.injection.scope.This;
-import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
-import org.apache.zest.spi.entitystore.EntityNotFoundException;
-import org.apache.zest.spi.entitystore.EntityStoreException;
-import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
-
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -57,6 +43,17 @@ import java.security.Security;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.zest.api.common.InvalidApplicationException;
+import org.apache.zest.api.configuration.Configuration;
+import org.apache.zest.api.entity.EntityDescriptor;
+import org.apache.zest.api.entity.EntityReference;
+import org.apache.zest.api.injection.scope.This;
+import org.apache.zest.api.service.ServiceActivation;
+import org.apache.zest.spi.entitystore.EntityNotFoundException;
+import org.apache.zest.spi.entitystore.EntityStoreException;
+import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
/**
* Riak Protobuf implementation of MapEntityStore.
@@ -334,40 +331,33 @@ public class RiakMapEntityStoreMixin implements ServiceActivation, MapEntityStor
}
@Override
- public Input<Reader, IOException> entityStates()
+ public Stream<Reader> entityStates()
{
- return new Input<Reader, IOException>()
+ try
{
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, IOException
- {
- try
- {
- ListKeys listKeys = new ListKeys.Builder( namespace ).build();
- ListKeys.Response listKeysResponse = riakClient.execute( listKeys );
- for( Location location : listKeysResponse )
- {
- FetchValue fetch = new FetchValue.Builder( location ).build();
- FetchValue.Response response = riakClient.execute( fetch );
- String jsonState = response.getValue( String.class );
- receiver.receive( new StringReader( jsonState ) );
- }
- }
- catch( InterruptedException | ExecutionException ex )
- {
- throw new EntityStoreException( "Unable to apply entity changes.", ex );
- }
- }
- } );
- }
- };
+ ListKeys listKeys = new ListKeys.Builder( namespace ).build();
+ ListKeys.Response listKeysResponse = riakClient.execute( listKeys );
+ return StreamSupport
+ .stream( listKeysResponse.spliterator(), false )
+ .map( location ->
+ {
+ try
+ {
+ FetchValue fetch = new FetchValue.Builder( location ).build();
+ FetchValue.Response response = riakClient.execute( fetch );
+ String jsonState = response.getValue( String.class );
+ return new StringReader( jsonState );
+ }
+ catch( InterruptedException | ExecutionException ex )
+ {
+ throw new EntityStoreException( "Unable to get entity states.", ex );
+ }
+ } );
+ }
+ catch( InterruptedException | ExecutionException ex )
+ {
+ throw new EntityStoreException( "Unable to get entity states.", ex );
+ }
}