You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@polygene.apache.org by pa...@apache.org on 2016/12/03 11:20:10 UTC
[1/2] zest-java git commit: entitystores: replace usage of core/io
with streams
Repository: zest-java
Updated Branches:
refs/heads/replace-io-by-streams [created] eb4e31a97
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-sql/src/main/java/org/apache/zest/entitystore/sql/SQLEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-sql/src/main/java/org/apache/zest/entitystore/sql/SQLEntityStoreMixin.java b/extensions/entitystore-sql/src/main/java/org/apache/zest/entitystore/sql/SQLEntityStoreMixin.java
index a52b526..fa66da2 100644
--- a/extensions/entitystore-sql/src/main/java/org/apache/zest/entitystore/sql/SQLEntityStoreMixin.java
+++ b/extensions/entitystore-sql/src/main/java/org/apache/zest/entitystore/sql/SQLEntityStoreMixin.java
@@ -34,8 +34,13 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Spliterator;
+import java.util.Spliterators;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import org.apache.zest.api.common.Optional;
import org.apache.zest.api.common.QualifiedName;
import org.apache.zest.api.entity.EntityDescriptor;
@@ -57,11 +62,6 @@ import org.apache.zest.entitystore.sql.internal.DatabaseSQLService;
import org.apache.zest.entitystore.sql.internal.DatabaseSQLService.EntityValueResult;
import org.apache.zest.entitystore.sql.internal.SQLEntityState;
import org.apache.zest.entitystore.sql.internal.SQLEntityState.DefaultSQLEntityState;
-import org.apache.zest.functional.Visitor;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.library.sql.common.SQLUtil;
import org.apache.zest.spi.ZestSPI;
import org.apache.zest.spi.entity.EntityState;
@@ -272,7 +272,8 @@ public class SQLEntityStoreMixin
EntityDescriptor entityDescriptor
)
{
- return new DefaultSQLEntityState( new DefaultEntityState( unitOfWork.currentTime(), entityRef, entityDescriptor ) );
+ return new DefaultSQLEntityState(
+ new DefaultEntityState( unitOfWork.currentTime(), entityRef, entityDescriptor ) );
}
@Override
@@ -282,77 +283,56 @@ public class SQLEntityStoreMixin
}
@Override
- public Input<EntityState, EntityStoreException> entityStates( final ModuleDescriptor module )
+ public Stream<EntityState> entityStates( final ModuleDescriptor module )
{
- return new Input<EntityState, EntityStoreException>()
+ try
{
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super EntityState, ReceiverThrowableType> output )
- throws EntityStoreException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<EntityState, EntityStoreException>()
+ Connection connection = database.getConnection();
+ PreparedStatement ps = database.prepareGetAllEntitiesStatement( connection );
+ database.populateGetAllEntitiesStatement( ps );
+ ResultSet rs = ps.executeQuery();
+ return StreamSupport.stream(
+ new Spliterators.AbstractSpliterator<EntityState>( Long.MAX_VALUE, Spliterator.ORDERED )
{
@Override
- public <RecThrowableType extends Throwable> void sendTo( final Receiver<? super EntityState, RecThrowableType> receiver )
- throws RecThrowableType, EntityStoreException
+ public boolean tryAdvance( final Consumer<? super EntityState> action )
{
- queryAllEntities( module, visited -> {
- try
- {
- receiver.receive( visited );
- }
- catch( Throwable receiverThrowableType )
- {
- throw new SQLException( receiverThrowableType );
- }
+ try
+ {
+ if( !rs.next() ) { return false; }
+ EntityState entityState = readEntityState( module,
+ database.getEntityValue( rs ).getReader() );
+ action.accept( entityState );
return true;
- } );
+ }
+ catch( SQLException ex )
+ {
+ SQLUtil.closeQuietly( rs, ex );
+ SQLUtil.closeQuietly( ps, ex );
+ SQLUtil.closeQuietly( connection, ex );
+ throw new EntityStoreException( ex );
+ }
}
- } );
- }
- };
- }
-
- private void queryAllEntities( ModuleDescriptor module, EntityStatesVisitor entityStatesVisitor )
- {
- Connection connection = null;
- PreparedStatement ps = null;
- ResultSet rs = null;
- try
- {
- connection = database.getConnection();
- ps = database.prepareGetAllEntitiesStatement( connection );
- database.populateGetAllEntitiesStatement( ps );
- rs = ps.executeQuery();
- while( rs.next() )
- {
- DefaultEntityState entityState = readEntityState( module, database.getEntityValue( rs ).getReader() );
- if( !entityStatesVisitor.visit( entityState ) )
+ },
+ false
+ ).onClose(
+ () ->
{
- return;
+ SQLUtil.closeQuietly( rs );
+ SQLUtil.closeQuietly( ps );
+ SQLUtil.closeQuietly( connection );
}
- }
+ );
}
catch( SQLException ex )
{
throw new EntityStoreException( ex );
}
- finally
- {
- SQLUtil.closeQuietly( rs );
- SQLUtil.closeQuietly( ps );
- SQLUtil.closeQuietly( connection );
- }
- }
-
- private interface EntityStatesVisitor
- extends Visitor<EntityState, SQLException>
- {
}
protected Identity newUnitOfWorkId()
{
- return identityGenerator.generate(EntityStore.class);
+ return identityGenerator.generate( EntityStore.class );
}
protected DefaultEntityState readEntityState( ModuleDescriptor module, Reader entityState )
@@ -364,7 +344,7 @@ public class SQLEntityStoreMixin
final EntityStatus[] status = { EntityStatus.LOADED };
String version = jsonObject.getString( JSONKeys.VERSION );
- Instant modified = Instant.ofEpochMilli(jsonObject.getLong( JSONKeys.MODIFIED ));
+ Instant modified = Instant.ofEpochMilli( jsonObject.getLong( JSONKeys.MODIFIED ) );
String identity = jsonObject.getString( JSONKeys.IDENTITY );
// Check if version is correct
@@ -398,102 +378,117 @@ public class SQLEntityStoreMixin
Map<QualifiedName, Object> properties = new HashMap<>();
JSONObject props = jsonObject.getJSONObject( JSONKeys.PROPERTIES );
- entityDescriptor.state().properties().forEach( propertyDescriptor -> {
- Object jsonValue;
- try
+ entityDescriptor.state().properties().forEach(
+ propertyDescriptor ->
{
- jsonValue = props.get( propertyDescriptor.qualifiedName().name() );
- if( JSONObject.NULL.equals( jsonValue ) )
+ Object jsonValue;
+ try
{
- properties.put( propertyDescriptor.qualifiedName(), null );
+ jsonValue = props.get(
+ propertyDescriptor.qualifiedName().name() );
+ if( JSONObject.NULL.equals( jsonValue ) )
+ {
+ properties.put( propertyDescriptor.qualifiedName(), null );
+ }
+ else
+ {
+ Object value = valueSerialization.deserialize( module,
+ propertyDescriptor.valueType(),
+ jsonValue.toString() );
+ properties.put( propertyDescriptor.qualifiedName(), value );
+ }
}
- else
+ catch( JSONException e )
{
- Object value = valueSerialization.deserialize( module, propertyDescriptor.valueType(), jsonValue
- .toString() );
- properties.put( propertyDescriptor.qualifiedName(), value );
+ // Value not found, default it
+ Object initialValue = propertyDescriptor.initialValue( module );
+ properties.put( propertyDescriptor.qualifiedName(), initialValue );
+ status[ 0 ] = EntityStatus.UPDATED;
}
}
- catch( JSONException e )
- {
- // Value not found, default it
- Object initialValue = propertyDescriptor.initialValue( module );
- properties.put( propertyDescriptor.qualifiedName(), initialValue );
- status[ 0 ] = EntityStatus.UPDATED;
- }
- } );
+ );
Map<QualifiedName, EntityReference> associations = new HashMap<>();
JSONObject assocs = jsonObject.getJSONObject( JSONKeys.ASSOCIATIONS );
- entityDescriptor.state().associations().forEach( associationType -> {
- try
+ entityDescriptor.state().associations().forEach(
+ associationType ->
{
- Object jsonValue = assocs.get( associationType.qualifiedName().name() );
- EntityReference value = jsonValue == JSONObject.NULL ? null : EntityReference.parseEntityReference(
- (String) jsonValue );
- associations.put( associationType.qualifiedName(), value );
- }
- catch( JSONException e )
- {
- // Association not found, default it to null
- associations.put( associationType.qualifiedName(), null );
- status[ 0 ] = EntityStatus.UPDATED;
+ try
+ {
+ Object jsonValue = assocs.get( associationType.qualifiedName().name() );
+ EntityReference value = jsonValue == JSONObject.NULL
+ ? null
+ : EntityReference.parseEntityReference( (String) jsonValue );
+ associations.put( associationType.qualifiedName(), value );
+ }
+ catch( JSONException e )
+ {
+ // Association not found, default it to null
+ associations.put( associationType.qualifiedName(), null );
+ status[ 0 ] = EntityStatus.UPDATED;
+ }
}
- } );
+ );
JSONObject manyAssocs = jsonObject.getJSONObject( JSONKeys.MANY_ASSOCIATIONS );
Map<QualifiedName, List<EntityReference>> manyAssociations = new HashMap<>();
- entityDescriptor.state().manyAssociations().forEach( manyAssociationType -> {
- List<EntityReference> references = new ArrayList<>();
- try
+ entityDescriptor.state().manyAssociations().forEach(
+ manyAssociationType ->
{
- JSONArray jsonValues = manyAssocs.getJSONArray( manyAssociationType.qualifiedName().name() );
- for( int i = 0; i < jsonValues.length(); i++ )
+ List<EntityReference> references = new ArrayList<>();
+ try
+ {
+ JSONArray jsonValues = manyAssocs.getJSONArray( manyAssociationType.qualifiedName().name() );
+ for( int i = 0; i < jsonValues.length(); i++ )
+ {
+ Object jsonValue = jsonValues.getString( i );
+ EntityReference value = jsonValue == JSONObject.NULL
+ ? null
+ : EntityReference.parseEntityReference( (String) jsonValue );
+ references.add( value );
+ }
+ manyAssociations.put( manyAssociationType.qualifiedName(), references );
+ }
+ catch( JSONException e )
{
- Object jsonValue = jsonValues.getString( i );
- EntityReference value = jsonValue == JSONObject.NULL ? null : EntityReference.parseEntityReference(
- (String) jsonValue );
- references.add( value );
+ // ManyAssociation not found, default to empty one
+ manyAssociations.put( manyAssociationType.qualifiedName(), references );
}
- manyAssociations.put( manyAssociationType.qualifiedName(), references );
- }
- catch( JSONException e )
- {
- // ManyAssociation not found, default to empty one
- manyAssociations.put( manyAssociationType.qualifiedName(), references );
- }
- } );
+ } );
JSONObject namedAssocs = jsonObject.has( JSONKeys.NAMED_ASSOCIATIONS )
? jsonObject.getJSONObject( JSONKeys.NAMED_ASSOCIATIONS )
: new JSONObject();
Map<QualifiedName, Map<String, EntityReference>> namedAssociations = new HashMap<>();
- entityDescriptor.state().namedAssociations().forEach( namedAssociationType -> {
- Map<String, EntityReference> references = new LinkedHashMap<>();
- try
+ entityDescriptor.state().namedAssociations().forEach(
+ namedAssociationType ->
{
- JSONObject jsonValues = namedAssocs.getJSONObject( namedAssociationType.qualifiedName().name() );
- JSONArray names = jsonValues.names();
- if( names != null )
+ Map<String, EntityReference> references = new LinkedHashMap<>();
+ try
{
- for( int idx = 0; idx < names.length(); idx++ )
+ JSONObject jsonValues = namedAssocs.getJSONObject( namedAssociationType.qualifiedName().name() );
+ JSONArray names = jsonValues.names();
+ if( names != null )
{
- String name = names.getString( idx );
- String jsonValue = jsonValues.getString( name );
- references.put( name, EntityReference.parseEntityReference( jsonValue ) );
+ for( int idx = 0; idx < names.length(); idx++ )
+ {
+ String name = names.getString( idx );
+ String jsonValue = jsonValues.getString( name );
+ references.put( name, EntityReference.parseEntityReference( jsonValue ) );
+ }
}
+ namedAssociations.put( namedAssociationType.qualifiedName(), references );
}
- namedAssociations.put( namedAssociationType.qualifiedName(), references );
- }
- catch( JSONException e )
- {
- // NamedAssociation not found, default to empty one
- namedAssociations.put( namedAssociationType.qualifiedName(), references );
- }
- } );
+ catch( JSONException e )
+ {
+ // NamedAssociation not found, default to empty one
+ namedAssociations.put( namedAssociationType.qualifiedName(), references );
+ }
+ } );
return new DefaultEntityState( version, modified,
- EntityReference.parseEntityReference( identity ), status[ 0 ], entityDescriptor,
+ EntityReference.parseEntityReference( identity ), status[ 0 ],
+ entityDescriptor,
properties, associations, manyAssociations, namedAssociations );
}
catch( JSONException e )
@@ -507,7 +502,7 @@ public class SQLEntityStoreMixin
throws IOException
{
JSONObject jsonObject;
- try (Reader reader = getValue( EntityReference.parseEntityReference( id ) ).getReader())
+ try( Reader reader = getValue( EntityReference.parseEntityReference( id ) ).getReader() )
{
jsonObject = new JSONObject( new JSONTokener( reader ) );
}
@@ -553,58 +548,65 @@ public class SQLEntityStoreMixin
try
{
JSONWriter json = new JSONWriter( writer );
- JSONWriter properties = json.object().
- key( JSONKeys.IDENTITY ).value( state.entityReference().identity().toString() ).
- key( JSONKeys.APPLICATION_VERSION ).value( application.version() ).
- key( JSONKeys.TYPE ).value( state.entityDescriptor().types().findFirst().get().getName() ).
- key( JSONKeys.VERSION ).value( version ).
- key( JSONKeys.MODIFIED ).value( state.lastModified().toEpochMilli() ).
- key( JSONKeys.PROPERTIES ).object();
-
- state.entityDescriptor().state().properties().forEach( persistentProperty -> {
- try
+ JSONWriter properties = json.object()
+ .key( JSONKeys.IDENTITY )
+ .value( state.entityReference().identity().toString() )
+ .key( JSONKeys.APPLICATION_VERSION )
+ .value( application.version() )
+ .key( JSONKeys.TYPE )
+ .value( state.entityDescriptor().types().findFirst().get().getName() )
+ .key( JSONKeys.VERSION )
+ .value( version )
+ .key( JSONKeys.MODIFIED )
+ .value( state.lastModified().toEpochMilli() )
+ .key( JSONKeys.PROPERTIES )
+ .object();
+
+ state.entityDescriptor().state().properties().forEach(
+ persistentProperty ->
{
- Object value = state.properties().get( persistentProperty.qualifiedName() );
- json.key( persistentProperty.qualifiedName().name() );
- if( value == null || ValueType.isPrimitiveValue( value ) )
+ try
{
- json.value( value );
- }
- else
- {
- String serialized = valueSerialization.serialize( value );
- if( serialized.startsWith( "{" ) )
+ Object value = state.properties().get( persistentProperty.qualifiedName() );
+ json.key( persistentProperty.qualifiedName().name() );
+ if( value == null || ValueType.isPrimitiveValue( value ) )
{
- json.value( new JSONObject( serialized ) );
- }
- else if( serialized.startsWith( "[" ) )
- {
- json.value( new JSONArray( serialized ) );
+ json.value( value );
}
else
{
- json.value( serialized );
+ String serialized = valueSerialization.serialize( value );
+ if( serialized.startsWith( "{" ) )
+ {
+ json.value( new JSONObject( serialized ) );
+ }
+ else if( serialized.startsWith( "[" ) )
+ {
+ json.value( new JSONArray( serialized ) );
+ }
+ else
+ {
+ json.value( serialized );
+ }
}
}
- }
- catch( JSONException e )
- {
- throw new EntityStoreException( "Could not store EntityState", e );
- }
- } );
+ catch( JSONException e )
+ {
+ throw new EntityStoreException(
+ "Could not store EntityState", e );
+ }
+ } );
JSONWriter associations = properties.endObject().key( JSONKeys.ASSOCIATIONS ).object();
- for( Map.Entry<QualifiedName, EntityReference> stateNameEntityReferenceEntry : state.associations()
- .entrySet() )
+ for( Map.Entry<QualifiedName, EntityReference> stateNameEntityRefEntry : state.associations().entrySet() )
{
- EntityReference value = stateNameEntityReferenceEntry.getValue();
- associations.key( stateNameEntityReferenceEntry.getKey().name() ).
- value( value != null ? value.identity().toString() : null );
+ EntityReference value = stateNameEntityRefEntry.getValue();
+ associations.key( stateNameEntityRefEntry.getKey().name() )
+ .value( value != null ? value.identity().toString() : null );
}
JSONWriter manyAssociations = associations.endObject().key( JSONKeys.MANY_ASSOCIATIONS ).object();
- for( Map.Entry<QualifiedName, List<EntityReference>> stateNameListEntry : state.manyAssociations()
- .entrySet() )
+ for( Map.Entry<QualifiedName, List<EntityReference>> stateNameListEntry : state.manyAssociations().entrySet() )
{
JSONWriter assocs = manyAssociations.key( stateNameListEntry.getKey().name() ).array();
for( EntityReference entityReference : stateNameListEntry.getValue() )
@@ -615,8 +617,7 @@ public class SQLEntityStoreMixin
}
JSONWriter namedAssociations = manyAssociations.endObject().key( JSONKeys.NAMED_ASSOCIATIONS ).object();
- for( Map.Entry<QualifiedName, Map<String, EntityReference>> stateNameMapEntry : state.namedAssociations()
- .entrySet() )
+ for( Map.Entry<QualifiedName, Map<String, EntityReference>> stateNameMapEntry : state.namedAssociations().entrySet() )
{
JSONWriter assocs = namedAssociations.key( stateNameMapEntry.getKey().name() ).object();
for( Map.Entry<String, EntityReference> entry : stateNameMapEntry.getValue().entrySet() )
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/migration/src/test/java/org/apache/zest/migration/MigrationTest.java
----------------------------------------------------------------------
diff --git a/extensions/migration/src/test/java/org/apache/zest/migration/MigrationTest.java b/extensions/migration/src/test/java/org/apache/zest/migration/MigrationTest.java
index 1da1f7b..7f8cb54 100644
--- a/extensions/migration/src/test/java/org/apache/zest/migration/MigrationTest.java
+++ b/extensions/migration/src/test/java/org/apache/zest/migration/MigrationTest.java
@@ -19,26 +19,18 @@
*/
package org.apache.zest.migration;
-import java.io.BufferedReader;
import java.io.IOException;
-import java.io.StringReader;
-import org.apache.zest.api.identity.Identity;
-import org.apache.zest.bootstrap.unitofwork.DefaultUnitOfWorkAssembler;
-import org.hamcrest.CoreMatchers;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.junit.Test;
+import java.util.List;
+import java.util.stream.Stream;
import org.apache.zest.api.activation.ActivationException;
+import org.apache.zest.api.identity.Identity;
import org.apache.zest.api.service.importer.NewObjectImporter;
import org.apache.zest.api.unitofwork.UnitOfWork;
import org.apache.zest.api.unitofwork.UnitOfWorkCompletionException;
import org.apache.zest.bootstrap.AssemblyException;
import org.apache.zest.bootstrap.ModuleAssembly;
import org.apache.zest.bootstrap.SingletonAssembler;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
+import org.apache.zest.bootstrap.unitofwork.DefaultUnitOfWorkAssembler;
import org.apache.zest.migration.assembly.EntityMigrationOperation;
import org.apache.zest.migration.assembly.MigrationBuilder;
import org.apache.zest.migration.assembly.MigrationOperation;
@@ -47,7 +39,12 @@ import org.apache.zest.spi.entitystore.helpers.JSONKeys;
import org.apache.zest.spi.entitystore.helpers.StateStore;
import org.apache.zest.test.AbstractZestTest;
import org.apache.zest.test.EntityTestAssembler;
+import org.hamcrest.CoreMatchers;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.junit.Test;
+import static java.util.stream.Collectors.toList;
import static org.junit.Assert.assertThat;
/**
@@ -106,7 +103,7 @@ public class MigrationTest
{
Identity id;
// Set up version 1
- StringInputOutput data_v1 = new StringInputOutput();
+ List<String> data_v1;
{
SingletonAssembler v1 = new SingletonAssembler()
{
@@ -130,11 +127,14 @@ public class MigrationTest
BackupRestore backupRestore = v1.module()
.findService( BackupRestore.class )
.get();
- backupRestore.backup().transferTo( data_v1 );
+ try( Stream<String> backup = backupRestore.backup() )
+ {
+ data_v1 = backup.collect( toList() );
+ }
}
// Set up version 1.1
- StringInputOutput data_v1_1 = new StringInputOutput();
+ List<String> data_v1_1;
{
SingletonAssembler v1_1 = new SingletonAssembler()
{
@@ -148,7 +148,7 @@ public class MigrationTest
};
BackupRestore testData = v1_1.module().findService( BackupRestore.class ).get();
- data_v1.transferTo( testData.restore() );
+ testData.restore( data_v1.stream() );
UnitOfWork uow = v1_1.module().unitOfWorkFactory().newUnitOfWork();
TestEntity1_1 entity = uow.get( TestEntity1_1.class, id );
@@ -157,7 +157,10 @@ public class MigrationTest
assertThat( "Association has been renamed", entity.newFooAssoc().get(), CoreMatchers.equalTo( entity ) );
uow.complete();
- testData.backup().transferTo( data_v1_1 );
+ try( Stream<String> backup = testData.backup() )
+ {
+ data_v1_1 = backup.collect( toList() );
+ }
}
// Set up version 2.0
@@ -177,7 +180,7 @@ public class MigrationTest
// Test migration from 1.0 -> 2.0
{
- data_v1.transferTo( testData.restore() );
+ testData.restore( data_v1.stream() );
UnitOfWork uow = v2_0.module().unitOfWorkFactory().newUnitOfWork();
TestEntity2_0 entity = uow.get( TestEntity2_0.class, id );
assertThat( "Property has been created", entity.bar().get(), CoreMatchers.equalTo( "Some value" ) );
@@ -202,11 +205,11 @@ public class MigrationTest
};
BackupRestore testData = v3_0.module().findService( BackupRestore.class ).get();
- data_v1_1.transferTo( testData.restore() );
+ testData.restore( data_v1_1.stream() );
// Test migration from 1.0 -> 3.0
{
- data_v1.transferTo( testData.restore() );
+ testData.restore( data_v1.stream() );
UnitOfWork uow = v3_0.module().unitOfWorkFactory().newUnitOfWork();
org.apache.zest.migration.moved.TestEntity2_0 entity = uow.get( org.apache.zest.migration.moved.TestEntity2_0.class, id );
uow.complete();
@@ -258,51 +261,4 @@ public class MigrationTest
System.out.println( msg );
}
}
-
- private static class StringInputOutput
- implements Output<String, IOException>, Input<String, IOException>
- {
- final StringBuilder builder = new StringBuilder();
-
- @Override
- public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender )
- throws IOException, SenderThrowableType
- {
- sender.sendTo((Receiver<String, IOException>) item -> builder.append( item ).append( "\n" ));
- }
-
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<String, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super String, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, IOException
- {
- BufferedReader reader = new BufferedReader( new StringReader( builder.toString() ) );
- String line;
- try
- {
- while( ( line = reader.readLine() ) != null )
- {
- receiver.receive( line );
- }
- }
- finally
- {
- reader.close();
- }
- }
- } );
- }
-
- @Override
- public String toString()
- {
- return builder.toString();
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/reindexer/src/main/java/org/apache/zest/index/reindexer/internal/ReindexerMixin.java
----------------------------------------------------------------------
diff --git a/extensions/reindexer/src/main/java/org/apache/zest/index/reindexer/internal/ReindexerMixin.java b/extensions/reindexer/src/main/java/org/apache/zest/index/reindexer/internal/ReindexerMixin.java
index c3a3af4..f11dc77 100644
--- a/extensions/reindexer/src/main/java/org/apache/zest/index/reindexer/internal/ReindexerMixin.java
+++ b/extensions/reindexer/src/main/java/org/apache/zest/index/reindexer/internal/ReindexerMixin.java
@@ -21,7 +21,7 @@
package org.apache.zest.index.reindexer.internal;
import java.util.ArrayList;
-import org.apache.zest.api.common.QualifiedName;
+import java.util.stream.Stream;
import org.apache.zest.api.configuration.Configuration;
import org.apache.zest.api.identity.HasIdentity;
import org.apache.zest.api.injection.scope.Service;
@@ -31,9 +31,6 @@ import org.apache.zest.api.service.ServiceReference;
import org.apache.zest.api.structure.ModuleDescriptor;
import org.apache.zest.index.reindexer.Reindexer;
import org.apache.zest.index.reindexer.ReindexerConfiguration;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.spi.entity.EntityState;
import org.apache.zest.spi.entitystore.EntityStore;
import org.apache.zest.spi.entitystore.StateChangeListener;
@@ -67,52 +64,43 @@ public class ReindexerMixin
{
loadValue = 50;
}
- new ReindexerOutput( loadValue ).reindex( store );
+ ReindexerHelper helper = new ReindexerHelper( loadValue );
+ helper.reindex( store );
}
- private class ReindexerOutput
- implements Output<EntityState, RuntimeException>, Receiver<EntityState, RuntimeException>
+ private class ReindexerHelper
{
private int count;
private int loadValue;
private ArrayList<EntityState> states;
- public ReindexerOutput( Integer loadValue )
+ private ReindexerHelper( int loadValue )
{
this.loadValue = loadValue;
states = new ArrayList<>();
}
- public void reindex( EntityStore store )
+ private void reindex( EntityStore store )
{
-
- store.entityStates( module ).transferTo( this );
- reindexState();
- }
-
- @Override
- public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends EntityState, SenderThrowableType> sender )
- throws RuntimeException, SenderThrowableType
- {
- sender.sendTo( this );
- reindexState();
- }
-
- @Override
- public void receive( EntityState item )
- throws RuntimeException
- {
- count++;
- item.setPropertyValue( HasIdentity.IDENTITY_STATE_NAME, item.entityReference().identity() );
- states.add( item );
-
- if( states.size() >= loadValue )
+ try( Stream<EntityState> entityStates = store.entityStates( module ) )
{
- reindexState();
+ entityStates
+ .forEach( entityState ->
+ {
+ count++;
+ entityState.setPropertyValue( HasIdentity.IDENTITY_STATE_NAME,
+ entityState.entityReference().identity() );
+ states.add( entityState );
+ if( states.size() >= loadValue )
+ {
+ reindexState();
+ }
+ } );
}
+ reindexState();
}
- public void reindexState()
+ private void reindexState()
{
for( ServiceReference<StateChangeListener> listener : listeners )
{
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/libraries/logging/src/test/java/org/apache/zest/library/logging/DebuggingTest.java
----------------------------------------------------------------------
diff --git a/libraries/logging/src/test/java/org/apache/zest/library/logging/DebuggingTest.java b/libraries/logging/src/test/java/org/apache/zest/library/logging/DebuggingTest.java
index ae75fb4..1fccdde 100644
--- a/libraries/logging/src/test/java/org/apache/zest/library/logging/DebuggingTest.java
+++ b/libraries/logging/src/test/java/org/apache/zest/library/logging/DebuggingTest.java
@@ -20,9 +20,8 @@
package org.apache.zest.library.logging;
-import java.util.function.Function;
+import java.util.stream.Stream;
import org.apache.zest.api.identity.Identity;
-import org.junit.Test;
import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.mixin.Mixins;
import org.apache.zest.api.service.ServiceComposite;
@@ -31,8 +30,6 @@ import org.apache.zest.api.unitofwork.UnitOfWork;
import org.apache.zest.api.unitofwork.UnitOfWorkCompletionException;
import org.apache.zest.bootstrap.AssemblyException;
import org.apache.zest.bootstrap.ModuleAssembly;
-import org.apache.zest.io.Outputs;
-import org.apache.zest.io.Transforms;
import org.apache.zest.library.logging.debug.Debug;
import org.apache.zest.library.logging.debug.DebugConcern;
import org.apache.zest.library.logging.debug.records.ServiceDebugRecordEntity;
@@ -42,6 +39,7 @@ import org.apache.zest.spi.entity.EntityState;
import org.apache.zest.spi.entitystore.EntityStore;
import org.apache.zest.test.AbstractZestTest;
import org.apache.zest.test.EntityTestAssembler;
+import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -76,19 +74,18 @@ public class DebuggingTest
assertEquals( message, "Hello!" );
EntityStore es = serviceFinder.findService( EntityStore.class ).get();
final Identity[] result = new Identity[1];
- es.entityStates( module ).transferTo( Transforms.map( new Function<EntityState, EntityState>()
- {
- public EntityState apply( EntityState entityState )
- {
- if( ServiceDebugRecordEntity.class.getName()
- .equals( entityState.entityDescriptor().types().findFirst().get().getName() ) )
- {
- result[0] = entityState.entityReference().identity();
- }
-
- return entityState;
- }
- }, Outputs.<EntityState>noop() ));
+ try( Stream<EntityState> entityStates = es.entityStates( module ) )
+ {
+ entityStates
+ .forEach( entityState ->
+ {
+ if( ServiceDebugRecordEntity.class.getName().equals(
+ entityState.entityDescriptor().types().findFirst().get().getName() ) )
+ {
+ result[ 0 ] = entityState.entityReference().identity();
+ }
+ } );
+ }
ServiceDebugRecordEntity debugEntry = uow.get( ServiceDebugRecordEntity.class, result[ 0 ] );
String mess = debugEntry.message().get();
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/libraries/sql/src/main/java/org/apache/zest/library/sql/common/SQLUtil.java
----------------------------------------------------------------------
diff --git a/libraries/sql/src/main/java/org/apache/zest/library/sql/common/SQLUtil.java b/libraries/sql/src/main/java/org/apache/zest/library/sql/common/SQLUtil.java
index 12a2bd4..e87c9c6 100644
--- a/libraries/sql/src/main/java/org/apache/zest/library/sql/common/SQLUtil.java
+++ b/libraries/sql/src/main/java/org/apache/zest/library/sql/common/SQLUtil.java
@@ -26,43 +26,94 @@ import java.sql.Statement;
public class SQLUtil
{
-
public static void closeQuietly( ResultSet resultSet )
{
- if ( resultSet != null ) {
- try {
+ closeQuietly( resultSet, null );
+ }
+
+ public static void closeQuietly( ResultSet resultSet, Throwable originalException )
+ {
+ if( resultSet != null )
+ {
+ try
+ {
resultSet.close();
- } catch ( SQLException ignored ) {
+ }
+ catch( SQLException ignored )
+ {
+ if( originalException != null )
+ {
+ originalException.addSuppressed( ignored );
+ }
}
}
}
public static void closeQuietly( Statement select )
{
- if ( select != null ) {
- try {
+ closeQuietly( select, null );
+ }
+
+ public static void closeQuietly( Statement select, Throwable originalException )
+ {
+ if( select != null )
+ {
+ try
+ {
select.close();
- } catch ( SQLException ignored ) {
+ }
+ catch( SQLException ignored )
+ {
+ if( originalException != null )
+ {
+ originalException.addSuppressed( ignored );
+ }
}
}
}
public static void closeQuietly( Connection connection )
{
- if ( connection != null ) {
- try {
+ closeQuietly( connection, null );
+ }
+
+ public static void closeQuietly( Connection connection, Throwable originalException )
+ {
+ if( connection != null )
+ {
+ try
+ {
connection.close();
- } catch ( SQLException ignored ) {
+ }
+ catch( SQLException ignored )
+ {
+ if( originalException != null )
+ {
+ originalException.addSuppressed( ignored );
+ }
}
}
}
public static void rollbackQuietly( Connection connection )
{
- if ( connection != null ) {
- try {
+ rollbackQuietly( connection, null );
+ }
+
+ public static void rollbackQuietly( Connection connection, Throwable originalException )
+ {
+ if( connection != null )
+ {
+ try
+ {
connection.rollback();
- } catch ( SQLException ignored ) {
+ }
+ catch( SQLException ignored )
+ {
+ if( originalException != null )
+ {
+ originalException.addSuppressed( ignored );
+ }
}
}
}
@@ -70,5 +121,4 @@ public class SQLUtil
private SQLUtil()
{
}
-
}
[2/2] zest-java git commit: entitystores: replace usage of core/io
with streams
Posted by pa...@apache.org.
entitystores: replace usage of core/io with streams
Project: http://git-wip-us.apache.org/repos/asf/zest-java/repo
Commit: http://git-wip-us.apache.org/repos/asf/zest-java/commit/eb4e31a9
Tree: http://git-wip-us.apache.org/repos/asf/zest-java/tree/eb4e31a9
Diff: http://git-wip-us.apache.org/repos/asf/zest-java/diff/eb4e31a9
Branch: refs/heads/replace-io-by-streams
Commit: eb4e31a97a92609feb2e7bb18d135de432d511db
Parents: 7ac5338
Author: Paul Merlin <pa...@apache.org>
Authored: Sat Dec 3 12:12:22 2016 +0100
Committer: Paul Merlin <pa...@apache.org>
Committed: Sat Dec 3 12:12:22 2016 +0100
----------------------------------------------------------------------
.../memory/MemoryMapEntityStoreMixin.java | 107 ++----
.../zest/spi/entitystore/BackupRestore.java | 25 +-
.../zest/spi/entitystore/EntityStore.java | 10 +-
.../helpers/JSONMapEntityStoreMixin.java | 125 +++----
.../spi/entitystore/helpers/MapEntityStore.java | 6 +-
.../helpers/MapEntityStoreMixin.java | 112 +++---
.../test/entity/AbstractEntityStoreTest.java | 32 ++
.../entitystore/file/FileEntityStoreMixin.java | 98 ++----
.../geode/GeodeEntityStoreMixin.java | 29 +-
.../hazelcast/HazelcastEntityStoreMixin.java | 29 +-
.../jclouds/JCloudsMapEntityStoreMixin.java | 73 ++--
.../entitystore/jdbm/JdbmEntityStoreMixin.java | 293 ++++++++--------
.../leveldb/LevelDBEntityStoreMixin.java | 65 ++--
.../mongodb/MongoMapEntityStoreMixin.java | 42 +--
.../prefs/PreferencesEntityStoreMixin.java | 55 +--
.../redis/RedisMapEntityStoreMixin.java | 42 +--
.../riak/RiakMapEntityStoreMixin.java | 82 ++---
.../entitystore/sql/SQLEntityStoreMixin.java | 337 ++++++++++---------
.../apache/zest/migration/MigrationTest.java | 90 ++---
.../reindexer/internal/ReindexerMixin.java | 54 ++-
.../zest/library/logging/DebuggingTest.java | 31 +-
.../apache/zest/library/sql/common/SQLUtil.java | 78 ++++-
22 files changed, 739 insertions(+), 1076 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java b/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java
index 893e17a..79ad54d 100644
--- a/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java
+++ b/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java
@@ -23,18 +23,13 @@ import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
+import java.io.UncheckedIOException;
import java.io.Writer;
import java.util.HashMap;
import java.util.Map;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
+import java.util.stream.Stream;
import org.apache.zest.api.entity.EntityDescriptor;
import org.apache.zest.api.entity.EntityReference;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.spi.entitystore.BackupRestore;
import org.apache.zest.spi.entitystore.EntityAlreadyExistsException;
import org.apache.zest.spi.entitystore.EntityNotFoundException;
@@ -42,6 +37,9 @@ import org.apache.zest.spi.entitystore.EntityStoreException;
import org.apache.zest.spi.entitystore.helpers.JSONKeys;
import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
import org.apache.zest.spi.entitystore.helpers.MapEntityStoreActivation;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
/**
* In-memory implementation of MapEntityStore.
@@ -90,95 +88,34 @@ public class MemoryMapEntityStoreMixin
}
@Override
- public Input<Reader, IOException> entityStates()
+ public Stream<Reader> entityStates()
{
- return new Input<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, IOException
- {
- for( String state : store.values() )
- {
- receiver.receive( new StringReader( state ) );
- }
- }
- } );
- }
- };
+ return store.values().stream().map( StringReader::new );
}
@Override
- public Input<String, IOException> backup()
+ public Stream<String> backup()
{
- return new Input<String, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<String, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super String, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, IOException
- {
- for( String state : store.values() )
- {
- receiver.receive( state );
- }
- }
- } );
- }
- };
+ return store.values().stream();
}
@Override
- public Output<String, IOException> restore()
+ public void restore( final Stream<String> stream )
{
- return new Output<String, IOException>()
- {
- @Override
- public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender )
- throws IOException, SenderThrowableType
+ store.clear();
+ stream.forEach( item -> {
+ try
{
- store.clear();
-
- try
- {
- sender.sendTo( new Receiver<String, IOException>()
- {
- @Override
- public void receive( String item )
- throws IOException
- {
- try
- {
- JSONTokener tokener = new JSONTokener( item );
- JSONObject entity = (JSONObject) tokener.nextValue();
- String id = entity.getString( JSONKeys.IDENTITY );
- store.put( EntityReference.parseEntityReference( id ), item );
- }
- catch( JSONException e )
- {
- throw new IOException( e );
- }
- }
- } );
- }
- catch( IOException e )
- {
- store.clear();
- throw e;
- }
+ JSONTokener tokener = new JSONTokener( item );
+ JSONObject entity = (JSONObject) tokener.nextValue();
+ String id = entity.getString( JSONKeys.IDENTITY );
+ store.put( EntityReference.parseEntityReference( id ), item );
+ }
+ catch( JSONException e )
+ {
+ throw new UncheckedIOException( new IOException( e ) );
}
- };
+ } );
}
private class MemoryMapChanger
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java
index e7cb5be..63aa3aa 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java
@@ -20,9 +20,8 @@
package org.apache.zest.spi.entitystore;
-import java.io.IOException;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
/**
* Allow backups and restores of data in an EntityStore to be made
@@ -30,16 +29,22 @@ import org.apache.zest.io.Output;
public interface BackupRestore
{
/**
- * Input that allows data from the entity store to be backed up.
- *
- * @return An Input instance containing the data to back up.
+ * Backup as a stream of serialized entity states, must be closed.
+ */
+ Stream<String> backup();
+
+ /**
+ * Restore from a stream of serialized entity states.
*/
- Input<String, IOException> backup();
+ void restore( Stream<String> states );
/**
- * Output that allows data to be restored from a backup.
+ * Restore from streams of serialized entity states.
*
- * @return An Output instance to receive the restored data.
+ * @return A consumer of streams of serialized entity states
*/
- Output<String, IOException> restore();
+ default Consumer<Stream<String>> restore()
+ {
+ return this::restore;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java
index 71752b0..2e002c2 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java
@@ -20,9 +20,9 @@
package org.apache.zest.spi.entitystore;
import java.time.Instant;
+import java.util.stream.Stream;
import org.apache.zest.api.structure.ModuleDescriptor;
import org.apache.zest.api.usecase.Usecase;
-import org.apache.zest.io.Input;
import org.apache.zest.spi.entity.EntityState;
/**
@@ -32,5 +32,11 @@ public interface EntityStore
{
EntityStoreUnitOfWork newUnitOfWork( ModuleDescriptor module, Usecase usecase, Instant currentTime );
- Input<EntityState, EntityStoreException> entityStates( ModuleDescriptor module );
+ /**
+ * Stream of all entity states, must be closed.
+ *
+ * @param module Module
+ * @return Stream of all entity states, must be closed
+ */
+ Stream<EntityState> entityStates( ModuleDescriptor module );
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java
index 90d5acb..ebaa2f2 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java
@@ -29,6 +29,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.stream.Stream;
import org.apache.zest.api.cache.CacheOptions;
import org.apache.zest.api.common.Optional;
import org.apache.zest.api.entity.EntityDescriptor;
@@ -47,10 +48,6 @@ import org.apache.zest.api.structure.ModuleDescriptor;
import org.apache.zest.api.unitofwork.NoSuchEntityTypeException;
import org.apache.zest.api.usecase.Usecase;
import org.apache.zest.api.value.ValueSerialization;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.spi.ZestSPI;
import org.apache.zest.spi.cache.Cache;
import org.apache.zest.spi.cache.CachePool;
@@ -310,102 +307,62 @@ public class JSONMapEntityStoreMixin
}
@Override
- public Input<EntityState, EntityStoreException> entityStates( final ModuleDescriptor module )
+ public Stream<EntityState> entityStates( ModuleDescriptor module )
{
- return new Input<EntityState, EntityStoreException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super EntityState, ReceiverThrowableType> output )
- throws EntityStoreException, ReceiverThrowableType
+ List<EntityState> migrated = new ArrayList<>();
+ return mapEntityStore.entityStates().map(
+ reader ->
{
- output.receiveFrom( new Sender<EntityState, EntityStoreException>()
+ EntityState entity = readEntityState( module, reader );
+ if( entity.status() == EntityStatus.UPDATED )
{
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super EntityState, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, EntityStoreException
+ migrated.add( entity );
+ // Synch back 100 at a time
+ if( migrated.size() > 100 )
{
- final List<EntityState> migrated = new ArrayList<>();
- try
- {
- mapEntityStore.entityStates().transferTo( new Output<Reader, ReceiverThrowableType>()
- {
- @Override
- public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends Reader, SenderThrowableType> sender )
- throws ReceiverThrowableType, SenderThrowableType
- {
- sender.sendTo( new Receiver<Reader, ReceiverThrowableType>()
- {
- @Override
- public void receive( Reader item )
- throws ReceiverThrowableType
- {
- final EntityState entity = readEntityState( module, item );
- if( entity.status() == EntityStatus.UPDATED )
- {
- migrated.add( entity );
-
- // Synch back 100 at a time
- if( migrated.size() > 100 )
- {
- try
- {
- synchMigratedEntities( migrated );
- }
- catch( IOException e )
- {
- throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
- }
- }
- }
- receiver.receive( entity );
- }
- } );
-
- // Synch any remaining migrated entities
- if( !migrated.isEmpty() )
- {
- try
- {
- synchMigratedEntities( migrated );
- }
- catch( IOException e )
- {
- throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
- }
- }
- }
- } );
- }
- catch( IOException e )
- {
- throw new EntityStoreException( e );
- }
+ synchMigratedEntities( migrated );
}
- } );
+ }
+ return entity;
}
- };
+ ).onClose(
+ () ->
+ {
+ // Synch any remaining migrated entities
+ if( !migrated.isEmpty() )
+ {
+ synchMigratedEntities( migrated );
+ }
+ }
+ );
}
private void synchMigratedEntities( final List<EntityState> migratedEntities )
- throws IOException
{
- mapEntityStore.applyChanges( new MapEntityStore.MapChanges()
+ try
{
- @Override
- public void visitMap( MapEntityStore.MapChanger changer )
- throws IOException
+ mapEntityStore.applyChanges( new MapEntityStore.MapChanges()
{
- for( EntityState migratedEntity : migratedEntities )
+ @Override
+ public void visitMap( MapEntityStore.MapChanger changer )
+ throws IOException
{
- JSONEntityState state = (JSONEntityState) migratedEntity;
- try (Writer writer = changer.updateEntity( state.entityReference(), state.entityDescriptor() ))
+ for( EntityState migratedEntity : migratedEntities )
{
- writeEntityState( state, writer, state.version(), state.lastModified() );
+ JSONEntityState state = (JSONEntityState) migratedEntity;
+ try( Writer writer = changer.updateEntity( state.entityReference(), state.entityDescriptor() ) )
+ {
+ writeEntityState( state, writer, state.version(), state.lastModified() );
+ }
}
}
- }
- } );
- migratedEntities.clear();
+ } );
+ migratedEntities.clear();
+ }
+ catch( IOException ex )
+ {
+ throw new EntityStoreException( "Synchronization of Migrated Entities failed.", ex );
+ }
}
protected Identity newUnitOfWorkId()
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java
index 2ec20a0..435494f 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java
@@ -22,9 +22,9 @@ package org.apache.zest.spi.entitystore.helpers;
import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
+import java.util.stream.Stream;
import org.apache.zest.api.entity.EntityDescriptor;
import org.apache.zest.api.entity.EntityReference;
-import org.apache.zest.io.Input;
import org.apache.zest.spi.entitystore.EntityNotFoundException;
import org.apache.zest.spi.entitystore.EntityStoreException;
@@ -42,9 +42,9 @@ public interface MapEntityStore
throws EntityStoreException;
/**
- * @return All entities state Readers
+ * @return All entities state Readers, must be closed
*/
- Input<Reader, IOException> entityStates();
+ Stream<Reader> entityStates();
void applyChanges( MapChanges changes )
throws IOException;
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java
index fceae54..8a61ced 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Stream;
import org.apache.zest.api.common.Optional;
import org.apache.zest.api.common.QualifiedName;
import org.apache.zest.api.entity.EntityDescriptor;
@@ -46,10 +47,6 @@ import org.apache.zest.api.unitofwork.NoSuchEntityTypeException;
import org.apache.zest.api.usecase.Usecase;
import org.apache.zest.api.value.ValueSerialization;
import org.apache.zest.api.value.ValueSerializationException;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.spi.ZestSPI;
import org.apache.zest.spi.entity.EntityState;
import org.apache.zest.spi.entity.EntityStatus;
@@ -200,74 +197,47 @@ public class MapEntityStoreMixin
}
@Override
- public Input<EntityState, EntityStoreException> entityStates( final ModuleDescriptor module )
+ public Stream<EntityState> entityStates( final ModuleDescriptor module )
{
- return new Input<EntityState, EntityStoreException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super EntityState, ReceiverThrowableType> output )
- throws EntityStoreException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<EntityState, EntityStoreException>()
- {
- @Override
- public <RecThrowableType extends Throwable> void sendTo( final Receiver<? super EntityState, RecThrowableType> receiver )
- throws RecThrowableType, EntityStoreException
- {
- final List<EntityState> migrated = new ArrayList<>();
- try
- {
- mapEntityStore.entityStates().transferTo( new Output<Reader, RecThrowableType>()
- {
- @Override
- public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends Reader, SenderThrowableType> sender )
- throws RecThrowableType, SenderThrowableType
- {
- sender.sendTo( item -> {
- final EntityState entity = readEntityState( module, item );
- if( entity.status() == EntityStatus.UPDATED )
- {
- migrated.add( entity );
-
- // Synch back 100 at a time
- if( migrated.size() > 100 )
- {
- try
- {
- synchMigratedEntities( migrated );
- }
- catch( IOException e )
- {
- throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
- }
- }
- }
- receiver.receive( entity );
- } );
-
- // Synch any remaining migrated entities
- if( !migrated.isEmpty() )
- {
- try
- {
- synchMigratedEntities( migrated );
- }
- catch( IOException e )
- {
- throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
- }
- }
- }
- } );
- }
- catch( IOException e )
- {
- throw new EntityStoreException( e );
- }
- }
- } );
- }
- };
+ List<EntityState> migrated = new ArrayList<>();
+ return mapEntityStore
+ .entityStates()
+ .map( reader ->
+ {
+ EntityState entity = readEntityState( module, reader );
+ if( entity.status() == EntityStatus.UPDATED )
+ {
+ migrated.add( entity );
+ // Synch back 100 at a time
+ if( migrated.size() > 100 )
+ {
+ try
+ {
+ synchMigratedEntities( migrated );
+ }
+ catch( IOException e )
+ {
+ throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
+ }
+ }
+ }
+ return entity;
+ } )
+ .onClose( () ->
+ {
+ // Synch any remaining migrated entities
+ if( !migrated.isEmpty() )
+ {
+ try
+ {
+ synchMigratedEntities( migrated );
+ }
+ catch( IOException e )
+ {
+ throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e );
+ }
+ }
+ } );
}
private void synchMigratedEntities( final List<EntityState> migratedEntities )
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java
----------------------------------------------------------------------
diff --git a/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java b/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java
index ab05f39..74368ae 100644
--- a/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java
+++ b/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java
@@ -31,6 +31,7 @@ import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Stream;
import org.apache.zest.api.association.Association;
import org.apache.zest.api.association.ManyAssociation;
import org.apache.zest.api.association.NamedAssociation;
@@ -51,6 +52,7 @@ import org.apache.zest.api.value.ValueBuilder;
import org.apache.zest.api.value.ValueComposite;
import org.apache.zest.bootstrap.AssemblyException;
import org.apache.zest.bootstrap.ModuleAssembly;
+import org.apache.zest.spi.entity.EntityState;
import org.apache.zest.spi.entitystore.EntityStore;
import org.apache.zest.test.AbstractZestTest;
import org.junit.After;
@@ -501,6 +503,36 @@ public abstract class AbstractEntityStoreTest
}
}
+ @Test
+ public void entityStatesSPI()
+ {
+ EntityStore entityStore = serviceFinder.findService( EntityStore.class ).get();
+
+ try( Stream<EntityState> states = entityStore.entityStates( module ) )
+ {
+ assertThat( states.count(), is( 0L ) );
+ }
+
+ UnitOfWork unitOfWork = unitOfWorkFactory.newUnitOfWork();
+ TestEntity newInstance = createEntity( unitOfWork );
+ unitOfWork.complete();
+
+ try( Stream<EntityState> states = entityStore.entityStates( module ) )
+ {
+ assertThat( states.count(), is( 1L ) );
+ }
+
+ unitOfWork = unitOfWorkFactory.newUnitOfWork();
+ TestEntity instance = unitOfWork.get( newInstance );
+ unitOfWork.remove( instance );
+ unitOfWork.complete();
+
+ try( Stream<EntityState> states = entityStore.entityStates( module ) )
+ {
+ assertThat( states.count(), is( 0L ) );
+ }
+ }
+
public interface TestEntity
extends EntityComposite
{
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java b/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java
index 1ab1c53..500f313 100644
--- a/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java
+++ b/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java
@@ -29,16 +29,13 @@ import java.io.Writer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
+import java.util.stream.Stream;
import org.apache.zest.api.common.Optional;
import org.apache.zest.api.configuration.Configuration;
import org.apache.zest.api.entity.EntityDescriptor;
import org.apache.zest.api.entity.EntityReference;
import org.apache.zest.api.injection.scope.Service;
import org.apache.zest.api.injection.scope.This;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.library.fileconfig.FileConfiguration;
import org.apache.zest.spi.entitystore.BackupRestore;
import org.apache.zest.spi.entitystore.EntityAlreadyExistsException;
@@ -231,84 +228,43 @@ public class FileEntityStoreMixin
}
@Override
- public Input<String, IOException> backup()
+ public Stream<Reader> entityStates()
{
- return new Input<String, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<String, IOException>()
- {
- @Override
- public <ThrowableType extends Throwable> void sendTo( Receiver<? super String, ThrowableType> receiver )
- throws ThrowableType, IOException
- {
- for( File sliceDirectory : dataDirectory.listFiles() )
- {
- for( File file : sliceDirectory.listFiles() )
- {
- receiver.receive( fetch( file ) );
- }
- }
- }
- } );
- }
- };
+ return backup().map( StringReader::new );
}
@Override
- public Output<String, IOException> restore()
+ public Stream<String> backup()
{
- return new Output<String, IOException>()
+ if( !dataDirectory.exists() )
{
- @Override
- public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender )
- throws IOException, SenderThrowableType
- {
- sender.sendTo( new Receiver<String, IOException>()
- {
- @Override
- public void receive( String item )
- throws IOException
- {
- String id = item.substring( "{\"reference\":\"".length() );
- id = id.substring( 0, id.indexOf( '"' ) );
- store( getDataFile( id ), item );
- }
- } );
- }
- };
+ return Stream.of();
+ }
+ try
+ {
+ return java.nio.file.Files.walk( dataDirectory.toPath(), 3 )
+ .skip( 1 )
+ .filter( path -> !"slices".equals( path.getFileName().toString() ) )
+ .map( Path::toFile )
+ .filter( file -> !file.isDirectory() )
+ .map( this::uncheckedFetch );
+ }
+ catch( IOException ex )
+ {
+ throw new EntityStoreException( ex );
+ }
}
@Override
- public Input<Reader, IOException> entityStates()
+ public void restore( final Stream<String> stream )
{
- return new Input<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
+ stream.forEach(
+ item ->
{
- output.receiveFrom( new Sender<Reader, IOException>()
- {
- @Override
- public <ThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ThrowableType> receiver )
- throws ThrowableType, IOException
- {
- for( File sliceDirectory : dataDirectory.listFiles() )
- {
- for( File file : sliceDirectory.listFiles() )
- {
- String state = fetch( file );
- receiver.receive( new StringReader( state ) );
- }
- }
- }
- } );
- }
- };
+ String id = item.substring( "{\"reference\":\"".length() );
+ id = id.substring( 0, id.indexOf( '"' ) );
+ uncheckedStore( getDataFile( id ), item );
+ } );
}
private File getDataFile( String identity )
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java b/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java
index fca8a6b..25a58b7 100644
--- a/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java
+++ b/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java
@@ -23,8 +23,8 @@ import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
-import java.util.Map;
import java.util.Properties;
+import java.util.stream.Stream;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
@@ -39,10 +39,6 @@ import org.apache.zest.api.entity.EntityDescriptor;
import org.apache.zest.api.entity.EntityReference;
import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.spi.entitystore.EntityNotFoundException;
import org.apache.zest.spi.entitystore.EntityStoreException;
import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
@@ -197,27 +193,8 @@ public class GeodeEntityStoreMixin
}
@Override
- public Input<Reader, IOException> entityStates()
+ public Stream<Reader> entityStates()
{
- return new Input<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<Reader, IOException>()
- {
- @Override
- public <RTT extends Throwable> void sendTo( Receiver<? super Reader, RTT> receiver )
- throws RTT, IOException
- {
- for( Map.Entry<String, String> eachEntry : region.entrySet() )
- {
- receiver.receive( new StringReader( eachEntry.getValue() ) );
- }
- }
- } );
- }
- };
+ return region.values().stream().map( StringReader::new );
}
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java b/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java
index f588862..34386f2 100644
--- a/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java
+++ b/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java
@@ -30,16 +30,12 @@ import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
-import java.util.Map;
+import java.util.stream.Stream;
import org.apache.zest.api.configuration.Configuration;
import org.apache.zest.api.entity.EntityDescriptor;
import org.apache.zest.api.entity.EntityReference;
import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.spi.entitystore.EntityNotFoundException;
import org.apache.zest.spi.entitystore.EntityStoreException;
import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
@@ -147,28 +143,9 @@ public class HazelcastEntityStoreMixin
}
@Override
- public Input<Reader, IOException> entityStates()
+ public Stream<Reader> entityStates()
{
- return new Input<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<Reader, IOException>()
- {
- @Override
- public <RTT extends Throwable> void sendTo( Receiver<? super Reader, RTT> receiver )
- throws RTT, IOException
- {
- for( Map.Entry<String, String> eachEntry : stringMap.entrySet() )
- {
- receiver.receive( new StringReader( eachEntry.getValue() ) );
- }
- }
- } );
- }
- };
+ return stringMap.values().stream().map( StringReader::new );
}
private Config createConfig( HazelcastConfiguration configuration )
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java b/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java
index f8c2ac1..b895177 100644
--- a/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java
+++ b/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java
@@ -26,7 +26,6 @@ import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import java.io.IOException;
import java.io.InputStream;
-import java.io.InputStreamReader;
import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
@@ -36,15 +35,12 @@ import java.util.Map;
import java.util.Properties;
import java.util.Scanner;
import java.util.Set;
+import java.util.stream.Stream;
import org.apache.zest.api.configuration.Configuration;
import org.apache.zest.api.entity.EntityDescriptor;
import org.apache.zest.api.entity.EntityReference;
import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.spi.entitystore.EntityNotFoundException;
import org.apache.zest.spi.entitystore.EntityStoreException;
import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
@@ -54,7 +50,6 @@ import org.jclouds.apis.Apis;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
-import org.jclouds.blobstore.domain.StorageMetadata;
import org.jclouds.io.Payload;
import org.jclouds.providers.ProviderMetadata;
import org.jclouds.providers.Providers;
@@ -254,52 +249,26 @@ public class JCloudsMapEntityStoreMixin
}
@Override
- public Input<Reader, IOException> entityStates()
+ public Stream<Reader> entityStates()
{
- return new Input<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- output.receiveFrom(
- new Sender<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, IOException
- {
- for( StorageMetadata stored : storeContext.getBlobStore().list() )
- {
- Payload payload = storeContext.getBlobStore().getBlob( container, stored.getName() ).getPayload();
- if( payload == null )
- {
- throw new EntityNotFoundException( EntityReference.parseEntityReference( stored.getName() ) );
- }
- InputStream input = null;
- try
- {
- input = payload.openStream();
- receiver.receive( new InputStreamReader( input, "UTF-8" ) );
- }
- finally
- {
- if( input != null )
- {
- try
- {
- input.close();
- }
- catch( IOException ignored )
- {
- }
- }
- }
- }
- }
- }
- );
- }
- };
+ return storeContext
+ .getBlobStore().list( container ).stream()
+ .map( metadata ->
+ {
+ Payload payload = storeContext.getBlobStore().getBlob( container, metadata.getName() ).getPayload();
+ if( payload == null )
+ {
+ throw new EntityNotFoundException( EntityReference.parseEntityReference( metadata.getName() ) );
+ }
+ try( InputStream input = payload.openStream() )
+ {
+ String state = new Scanner( input, UTF_8.name() ).useDelimiter( "\\Z" ).next();
+ return (Reader) new StringReader( state );
+ }
+ catch( IOException ex )
+ {
+ throw new EntityStoreException( ex );
+ }
+ } );
}
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java b/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java
index fe3a9cf..19c2b7d 100644
--- a/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java
+++ b/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java
@@ -24,9 +24,16 @@ import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
+import java.io.UncheckedIOException;
import java.io.Writer;
import java.util.Properties;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import jdbm.RecordManager;
import jdbm.RecordManagerFactory;
import jdbm.RecordManagerOptions;
@@ -48,10 +55,6 @@ import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.injection.scope.Uses;
import org.apache.zest.api.service.ServiceDescriptor;
import org.apache.zest.io.Files;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.library.fileconfig.FileConfiguration;
import org.apache.zest.library.locking.ReadLock;
import org.apache.zest.library.locking.WriteLock;
@@ -213,198 +216,166 @@ public class JdbmEntityStoreMixin
}
@Override
- public Input<Reader, IOException> entityStates()
+ public Stream<Reader> entityStates()
{
- return new Input<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- lock.writeLock().lock();
-
- try
- {
- output.receiveFrom( new Sender<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, IOException
- {
- final TupleBrowser browser = index.browse();
- final Tuple tuple = new Tuple();
-
- while( browser.getNext( tuple ) )
- {
- Identity id = new StringIdentity( (byte[]) tuple.getKey() );
-
- Long stateIndex = getStateIndex( id );
-
- if( stateIndex == null )
- {
- continue;
- } // Skip this one
-
- byte[] serializedState = (byte[]) recordManager.fetch( stateIndex, serializer );
-
- receiver.receive( new StringReader( new String( serializedState, "UTF-8" ) ) );
- }
- }
- } );
- }
- finally
- {
- lock.writeLock().unlock();
- }
- }
- };
+ return backup().map( StringReader::new );
}
@Override
- public Input<String, IOException> backup()
+ public Stream<String> backup()
{
- return new Input<String, IOException>()
+ lock.writeLock().lock();
+ TupleBrowser browser;
+ try
{
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
+ browser = index.browse();
+ }
+ catch( IOException ex )
+ {
+ lock.writeLock().unlock();
+ throw new EntityStoreException( ex );
+ }
+ return StreamSupport.stream(
+ new Spliterators.AbstractSpliterator<String>( Long.MAX_VALUE, Spliterator.ORDERED )
{
- lock.readLock().lock();
+ private final Tuple tuple = new Tuple();
- try
+ @Override
+ public boolean tryAdvance( final Consumer<? super String> action )
{
- output.receiveFrom( new Sender<String, IOException>()
+ try
{
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super String, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, IOException
+ if( !browser.getNext( tuple ) )
{
- final TupleBrowser browser = index.browse();
- final Tuple tuple = new Tuple();
-
- while( browser.getNext( tuple ) )
- {
- String id = new String( (byte[]) tuple.getKey(), "UTF-8" );
-
- Long stateIndex = getStateIndex( new StringIdentity( id ) );
-
- if( stateIndex == null )
- {
- continue;
- } // Skip this one
-
- byte[] serializedState = (byte[]) recordManager.fetch( stateIndex, serializer );
-
- receiver.receive( new String( serializedState, "UTF-8" ) );
- }
+ return false;
}
- } );
- }
- finally
- {
- lock.readLock().unlock();
+ Identity identity = new StringIdentity( (byte[]) tuple.getKey() );
+ Long stateIndex = getStateIndex( identity );
+ if( stateIndex == null )
+ {
+ return false;
+ }
+ byte[] serializedState = (byte[]) recordManager.fetch( stateIndex, serializer );
+ String state = new String( serializedState, "UTF-8" );
+ action.accept( state );
+ return true;
+ }
+ catch( IOException ex )
+ {
+ lock.writeLock().unlock();
+ throw new EntityStoreException( ex );
+ }
}
- }
- };
+ },
+ false
+ ).onClose( () -> lock.writeLock().unlock() );
}
@Override
- public Output<String, IOException> restore()
+ public void restore( final Stream<String> states )
{
- return new Output<String, IOException>()
+ File dbFile = new File( getDatabaseName() + ".db" );
+ File lgFile = new File( getDatabaseName() + ".lg" );
+
+ // Create temporary store
+ File tempDatabase = Files.createTemporayFileOf( dbFile );
+ final RecordManager recordManager;
+ final BTree index;
+ try
+ {
+ recordManager = RecordManagerFactory.createRecordManager( tempDatabase.getAbsolutePath(),
+ new Properties() );
+ ByteArrayComparator comparator = new ByteArrayComparator();
+ index = BTree.createInstance( recordManager, comparator, serializer, DefaultSerializer.INSTANCE, 16 );
+ recordManager.setNamedObject( "index", index.getRecid() );
+ recordManager.commit();
+ }
+ catch( IOException ex )
+ {
+ throw new EntityStoreException( ex );
+ }
+ try
{
- @Override
- public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender )
- throws IOException, SenderThrowableType
+ // TODO NO NEED TO SYNCHRONIZE HERE
+ AtomicLong counter = new AtomicLong();
+ Consumer<String> stateConsumer = state ->
{
- File dbFile = new File( getDatabaseName() + ".db" );
- File lgFile = new File( getDatabaseName() + ".lg" );
-
- // Create temporary store
- File tempDatabase = Files.createTemporayFileOf( dbFile );
-
- final RecordManager recordManager = RecordManagerFactory.createRecordManager( tempDatabase.getAbsolutePath(), new Properties() );
- ByteArrayComparator comparator = new ByteArrayComparator();
- final BTree index = BTree.createInstance( recordManager, comparator, serializer, DefaultSerializer.INSTANCE, 16 );
- recordManager.setNamedObject( "index", index.getRecid() );
- recordManager.commit();
-
try
{
- sender.sendTo( new Receiver<String, IOException>()
+ // Commit one batch
+ if( ( counter.incrementAndGet() % 1000 ) == 0 )
{
- int counter = 0;
-
- @Override
- public void receive( String item )
- throws IOException
- {
- // Commit one batch
- if( ( counter++ % 1000 ) == 0 )
- {
- recordManager.commit();
- }
+ recordManager.commit();
+ }
- String id = item.substring( "{\"reference\":\"".length() );
- id = id.substring( 0, id.indexOf( '"' ) );
+ String id = state.substring( "{\"reference\":\"".length() );
+ id = id.substring( 0, id.indexOf( '"' ) );
- // Insert
- byte[] stateArray = item.getBytes( "UTF-8" );
- long stateIndex = recordManager.insert( stateArray, serializer );
- index.insert( id.getBytes( "UTF-8" ), stateIndex, false );
- }
- } );
+ // Insert
+ byte[] stateArray = state.getBytes( "UTF-8" );
+ long stateIndex = recordManager.insert( stateArray, serializer );
+ index.insert( id.getBytes( "UTF-8" ), stateIndex, false );
}
- catch( IOException e )
+ catch( IOException ex )
{
- recordManager.close();
- tempDatabase.delete();
- throw e;
+ throw new UncheckedIOException( ex );
}
- catch( Throwable senderThrowableType )
- {
- recordManager.close();
- tempDatabase.delete();
- throw (SenderThrowableType) senderThrowableType;
- }
-
- // Import went ok - continue
- recordManager.commit();
- // close file handles otherwise Microsoft Windows will fail to rename database files.
+ };
+ states.forEach( stateConsumer );
+ // Import went ok - continue
+ recordManager.commit();
+ // close file handles otherwise Microsoft Windows will fail to rename database files.
+ recordManager.close();
+ }
+ catch( IOException | UncheckedIOException ex )
+ {
+ try
+ {
recordManager.close();
+ }
+ catch( IOException ignore ) { }
+ tempDatabase.delete();
+ throw new EntityStoreException( ex );
+ }
+ try
+ {
- lock.writeLock().lock();
- try
- {
- // Replace old database with new
- JdbmEntityStoreMixin.this.recordManager.close();
-
- boolean deletedOldDatabase = true;
- deletedOldDatabase &= dbFile.delete();
- deletedOldDatabase &= lgFile.delete();
- if( !deletedOldDatabase )
- {
- throw new IOException( "Could not remove old database" );
- }
+ lock.writeLock().lock();
+ try
+ {
+ // Replace old database with new
+ JdbmEntityStoreMixin.this.recordManager.close();
- boolean renamedTempDatabase = true;
- renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".db" ).renameTo( dbFile );
- renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".lg" ).renameTo( lgFile );
+ boolean deletedOldDatabase = true;
+ deletedOldDatabase &= dbFile.delete();
+ deletedOldDatabase &= lgFile.delete();
+ if( !deletedOldDatabase )
+ {
+ throw new EntityStoreException( "Could not remove old database" );
+ }
- if( !renamedTempDatabase )
- {
- throw new IOException( "Could not replace database with temp database" );
- }
+ boolean renamedTempDatabase = true;
+ renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".db" ).renameTo( dbFile );
+ renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".lg" ).renameTo( lgFile );
- // Start up again
- initialize();
- }
- finally
+ if( !renamedTempDatabase )
{
- lock.writeLock().unlock();
+ throw new EntityStoreException( "Could not replace database with temp database" );
}
+
+ // Start up again
+ initialize();
}
- };
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+ catch( IOException ex )
+ {
+ tempDatabase.delete();
+ throw new EntityStoreException( ex );
+ }
}
private String getDatabaseName()
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java b/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java
index 638b021..7e485b2 100644
--- a/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java
+++ b/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java
@@ -26,6 +26,11 @@ import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
import java.nio.charset.Charset;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import org.apache.zest.api.configuration.Configuration;
import org.apache.zest.api.entity.EntityDescriptor;
import org.apache.zest.api.entity.EntityReference;
@@ -34,10 +39,6 @@ import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.injection.scope.Uses;
import org.apache.zest.api.service.ServiceActivation;
import org.apache.zest.api.service.ServiceDescriptor;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.library.fileconfig.FileConfiguration;
import org.apache.zest.spi.entitystore.EntityNotFoundException;
import org.apache.zest.spi.entitystore.EntityStoreException;
@@ -197,42 +198,38 @@ public class LevelDBEntityStoreMixin
}
@Override
- public Input<Reader, IOException> entityStates()
+ public Stream<Reader> entityStates()
{
- return new Input<Reader, IOException>()
- {
-
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
+ DBIterator iterator = db.iterator();
+ iterator.seekToFirst();
+ return StreamSupport.stream(
+ new Spliterators.AbstractSpliterator<Reader>( Long.MAX_VALUE, Spliterator.ORDERED )
{
- output.receiveFrom( new Sender<Reader, IOException>()
+ @Override
+ public boolean tryAdvance( final Consumer<? super Reader> action )
{
-
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, IOException
+ if( !iterator.hasNext() )
{
- DBIterator iterator = db.iterator();
- try
- {
- for( iterator.seekToFirst(); iterator.hasNext(); iterator.next() )
- {
- byte[] state = iterator.peekNext().getValue();
- String jsonState = new String( state, charset );
- receiver.receive( new StringReader( jsonState ) );
- }
- }
- finally
- {
- iterator.close();
- }
+ return false;
}
-
- } );
+ action.accept( new StringReader( new String( iterator.next().getValue(), charset ) ) );
+ return true;
+ }
+ },
+ false
+ ).onClose(
+ () ->
+ {
+ try
+ {
+ iterator.close();
+ }
+ catch( IOException ex )
+ {
+ throw new EntityStoreException( "Unable to close DB iterator" );
+ }
}
-
- };
+ );
}
@Override
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java b/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java
index 9b988f5..ee97171 100644
--- a/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java
+++ b/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java
@@ -25,7 +25,6 @@ import com.mongodb.MongoClientOptions;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.WriteConcern;
-import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
@@ -40,15 +39,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import org.apache.zest.api.configuration.Configuration;
import org.apache.zest.api.entity.EntityDescriptor;
import org.apache.zest.api.entity.EntityReference;
import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.spi.entitystore.EntityNotFoundException;
import org.apache.zest.spi.entitystore.EntityStoreException;
import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
@@ -289,33 +286,16 @@ public class MongoMapEntityStoreMixin
}
@Override
- public Input<Reader, IOException> entityStates()
+ public Stream<Reader> entityStates()
{
- return new Input<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo(
- Output<? super Reader, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo(
- Receiver<? super Reader, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, IOException
- {
- FindIterable<Document> cursor = db.getCollection( collectionName ).find();
- for( Document eachEntity : cursor )
- {
- Document bsonState = (Document) eachEntity.get( STATE_COLUMN );
- String jsonState = JSON.serialize( bsonState );
- receiver.receive( new StringReader( jsonState ) );
- }
- }
- } );
- }
- };
+ return StreamSupport
+ .stream( db.getCollection( collectionName ).find().spliterator(), false )
+ .map( eachEntity ->
+ {
+ Document bsonState = (Document) eachEntity.get( STATE_COLUMN );
+ String jsonState = JSON.serialize( bsonState );
+ return new StringReader( jsonState );
+ } );
}
private Bson byIdentity( EntityReference entityReference )
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java b/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java
index f76f8d3..09918a2 100644
--- a/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java
+++ b/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.prefs.BackingStoreException;
import java.util.prefs.Preferences;
+import java.util.stream.Stream;
import org.apache.zest.api.cache.CacheOptions;
import org.apache.zest.api.common.QualifiedName;
import org.apache.zest.api.entity.EntityDescriptor;
@@ -57,10 +58,6 @@ import org.apache.zest.api.usecase.Usecase;
import org.apache.zest.api.usecase.UsecaseBuilder;
import org.apache.zest.api.value.ValueSerialization;
import org.apache.zest.api.value.ValueSerializationException;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.spi.ZestSPI;
import org.apache.zest.spi.entity.EntityState;
import org.apache.zest.spi.entity.EntityStatus;
@@ -175,43 +172,23 @@ public class PreferencesEntityStoreMixin
}
@Override
- public Input<EntityState, EntityStoreException> entityStates( final ModuleDescriptor module )
+ public Stream<EntityState> entityStates( final ModuleDescriptor module )
{
- return new Input<EntityState, EntityStoreException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super EntityState, ReceiverThrowableType> output )
- throws EntityStoreException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<EntityState, EntityStoreException>()
- {
- @Override
- public <RecThrowableType extends Throwable> void sendTo( Receiver<? super EntityState, RecThrowableType> receiver )
- throws RecThrowableType, EntityStoreException
- {
- UsecaseBuilder builder = UsecaseBuilder.buildUsecase( "zest.entitystore.preferences.visit" );
- Usecase visitUsecase = builder.withMetaInfo( CacheOptions.NEVER ).newUsecase();
- final EntityStoreUnitOfWork uow =
- newUnitOfWork( module, visitUsecase, SystemTime.now() );
+ UsecaseBuilder builder = UsecaseBuilder.buildUsecase( "zest.entitystore.preferences.visit" );
+ Usecase visitUsecase = builder.withMetaInfo( CacheOptions.NEVER ).newUsecase();
+ EntityStoreUnitOfWork uow = newUnitOfWork( module, visitUsecase, SystemTime.now() );
- try
- {
- String[] identities = root.childrenNames();
- for( String identity : identities )
- {
- EntityReference reference = EntityReference.parseEntityReference( identity );
- EntityState entityState = uow.entityStateOf( module, reference );
- receiver.receive( entityState );
- }
- }
- catch( BackingStoreException e )
- {
- throw new EntityStoreException( e );
- }
- }
- } );
- }
- };
+ try
+ {
+ return Stream.of( root.childrenNames() )
+ .map( EntityReference::parseEntityReference )
+ .map( ref -> uow.entityStateOf( module, ref ) )
+ .onClose( uow::discard );
+ }
+ catch( BackingStoreException e )
+ {
+ throw new EntityStoreException( e );
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java b/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java
index 4113e39..9bc2446 100644
--- a/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java
+++ b/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java
@@ -24,16 +24,12 @@ import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
-import java.util.Set;
+import java.util.stream.Stream;
import org.apache.zest.api.configuration.Configuration;
import org.apache.zest.api.entity.EntityDescriptor;
import org.apache.zest.api.entity.EntityReference;
import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
import org.apache.zest.spi.entitystore.EntityAlreadyExistsException;
import org.apache.zest.spi.entitystore.EntityNotFoundException;
import org.apache.zest.spi.entitystore.EntityStoreException;
@@ -164,38 +160,12 @@ public class RedisMapEntityStoreMixin
}
@Override
- public Input<Reader, IOException> entityStates()
+ public Stream<Reader> entityStates()
{
- return new Input<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, IOException
- {
- Jedis jedis = pool.getResource();
- try
- {
- Set<String> keys = jedis.keys( "*" );
- for( String key : keys )
- {
- String jsonState = jedis.get( key );
- receiver.receive( new StringReader( jsonState ) );
- }
- }
- finally
- {
- pool.returnResource( jedis );
- }
- }
- } );
- }
- };
+ Jedis jedis = pool.getResource();
+ return jedis.keys( "*" ).stream()
+ .map( key -> (Reader) new StringReader( jedis.get( key ) ) )
+ .onClose( jedis::close );
}
private static boolean notFound( String jsonState )
http://git-wip-us.apache.org/repos/asf/zest-java/blob/eb4e31a9/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java
----------------------------------------------------------------------
diff --git a/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java b/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java
index 0160dfa..fcda8fa 100644
--- a/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java
+++ b/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java
@@ -28,20 +28,6 @@ import com.basho.riak.client.core.RiakNode;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import com.basho.riak.client.core.util.HostAndPort;
-import org.apache.zest.api.common.InvalidApplicationException;
-import org.apache.zest.api.configuration.Configuration;
-import org.apache.zest.api.entity.EntityDescriptor;
-import org.apache.zest.api.entity.EntityReference;
-import org.apache.zest.api.injection.scope.This;
-import org.apache.zest.api.service.ServiceActivation;
-import org.apache.zest.io.Input;
-import org.apache.zest.io.Output;
-import org.apache.zest.io.Receiver;
-import org.apache.zest.io.Sender;
-import org.apache.zest.spi.entitystore.EntityNotFoundException;
-import org.apache.zest.spi.entitystore.EntityStoreException;
-import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
-
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -57,6 +43,17 @@ import java.security.Security;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.zest.api.common.InvalidApplicationException;
+import org.apache.zest.api.configuration.Configuration;
+import org.apache.zest.api.entity.EntityDescriptor;
+import org.apache.zest.api.entity.EntityReference;
+import org.apache.zest.api.injection.scope.This;
+import org.apache.zest.api.service.ServiceActivation;
+import org.apache.zest.spi.entitystore.EntityNotFoundException;
+import org.apache.zest.spi.entitystore.EntityStoreException;
+import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
/**
* Riak Protobuf implementation of MapEntityStore.
@@ -334,40 +331,33 @@ public class RiakMapEntityStoreMixin implements ServiceActivation, MapEntityStor
}
@Override
- public Input<Reader, IOException> entityStates()
+ public Stream<Reader> entityStates()
{
- return new Input<Reader, IOException>()
+ try
{
- @Override
- public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
- throws IOException, ReceiverThrowableType
- {
- output.receiveFrom( new Sender<Reader, IOException>()
- {
- @Override
- public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
- throws ReceiverThrowableType, IOException
- {
- try
- {
- ListKeys listKeys = new ListKeys.Builder( namespace ).build();
- ListKeys.Response listKeysResponse = riakClient.execute( listKeys );
- for( Location location : listKeysResponse )
- {
- FetchValue fetch = new FetchValue.Builder( location ).build();
- FetchValue.Response response = riakClient.execute( fetch );
- String jsonState = response.getValue( String.class );
- receiver.receive( new StringReader( jsonState ) );
- }
- }
- catch( InterruptedException | ExecutionException ex )
- {
- throw new EntityStoreException( "Unable to apply entity changes.", ex );
- }
- }
- } );
- }
- };
+ ListKeys listKeys = new ListKeys.Builder( namespace ).build();
+ ListKeys.Response listKeysResponse = riakClient.execute( listKeys );
+ return StreamSupport
+ .stream( listKeysResponse.spliterator(), false )
+ .map( location ->
+ {
+ try
+ {
+ FetchValue fetch = new FetchValue.Builder( location ).build();
+ FetchValue.Response response = riakClient.execute( fetch );
+ String jsonState = response.getValue( String.class );
+ return new StringReader( jsonState );
+ }
+ catch( InterruptedException | ExecutionException ex )
+ {
+ throw new EntityStoreException( "Unable to get entity states.", ex );
+ }
+ } );
+ }
+ catch( InterruptedException | ExecutionException ex )
+ {
+ throw new EntityStoreException( "Unable to get entity states.", ex );
+ }
}