You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@archiva.apache.org by ma...@apache.org on 2019/08/20 21:07:28 UTC

[archiva] branch feature/storage_refactoring updated: Fixing cassandra stream api

This is an automated email from the ASF dual-hosted git repository.

martin_s pushed a commit to branch feature/storage_refactoring
in repository https://gitbox.apache.org/repos/asf/archiva.git


The following commit(s) were added to refs/heads/feature/storage_refactoring by this push:
     new cda4ac8  Fixing cassandra stream api
cda4ac8 is described below

commit cda4ac8085f840060ced0163d21f1e34407411d8
Author: Martin Stockhammer <ma...@apache.org>
AuthorDate: Tue Aug 20 23:07:22 2019 +0200

    Fixing cassandra stream api
---
 .../repository/AbstractMetadataRepositoryTest.java |  2 +
 .../plugins/metadata-store-cassandra/pom.xml       |  1 +
 .../cassandra/CassandraMetadataRepository.java     | 97 ++++++++++++++++------
 3 files changed, 75 insertions(+), 25 deletions(-)

diff --git a/archiva-modules/metadata/metadata-repository-api/src/test/java/org/apache/archiva/metadata/repository/AbstractMetadataRepositoryTest.java b/archiva-modules/metadata/metadata-repository-api/src/test/java/org/apache/archiva/metadata/repository/AbstractMetadataRepositoryTest.java
index 2232a7b..a13abdc 100644
--- a/archiva-modules/metadata/metadata-repository-api/src/test/java/org/apache/archiva/metadata/repository/AbstractMetadataRepositoryTest.java
+++ b/archiva-modules/metadata/metadata-repository-api/src/test/java/org/apache/archiva/metadata/repository/AbstractMetadataRepositoryTest.java
@@ -893,6 +893,7 @@ public abstract class AbstractMetadataRepositoryTest
                 assertNotNull( str );
                 List<TestMetadataFacet> result = str.collect( Collectors.toList( ) );
                 assertEquals( 1, result.size( ) );
+                assertNotNull( result.get( 0 ) );
                 assertEquals( TEST_NAME, result.get( 0 ).getName( ) );
             } );
 
@@ -918,6 +919,7 @@ public abstract class AbstractMetadataRepositoryTest
                 assertNotNull( str );
                 List<TestMetadataFacet> result = str.collect( Collectors.toList( ) );
                 assertEquals( 100, result.size( ) );
+                assertNotNull( result.get( 0 ) );
                 for (int i=0; i<10; i++) {
                     log.info( "Result {}", result.get( i ).getName( ) );
                 }
diff --git a/archiva-modules/plugins/metadata-store-cassandra/pom.xml b/archiva-modules/plugins/metadata-store-cassandra/pom.xml
index bd905ac..d8ec3a9 100644
--- a/archiva-modules/plugins/metadata-store-cassandra/pom.xml
+++ b/archiva-modules/plugins/metadata-store-cassandra/pom.xml
@@ -260,6 +260,7 @@
               <archiva.repositorySessionFactory.id>cassandra</archiva.repositorySessionFactory.id>
               <appserver.base>${project.build.directory}/appserver-base</appserver.base>
             </systemPropertyVariables>
+            <trimStackTrace>false</trimStackTrace>
           </configuration>
         </plugin>
       </plugins>
diff --git a/archiva-modules/plugins/metadata-store-cassandra/src/main/java/org/apache/archiva/metadata/repository/cassandra/CassandraMetadataRepository.java b/archiva-modules/plugins/metadata-store-cassandra/src/main/java/org/apache/archiva/metadata/repository/cassandra/CassandraMetadataRepository.java
index 207da55..2b84b0d 100644
--- a/archiva-modules/plugins/metadata-store-cassandra/src/main/java/org/apache/archiva/metadata/repository/cassandra/CassandraMetadataRepository.java
+++ b/archiva-modules/plugins/metadata-store-cassandra/src/main/java/org/apache/archiva/metadata/repository/cassandra/CassandraMetadataRepository.java
@@ -85,6 +85,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Spliterator;
 import java.util.UUID;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Stream;
@@ -1526,26 +1527,39 @@ public class CassandraMetadataRepository
         return facets;
     }
 
-    private <T> Spliterator<T> createResultSpliterator( QueryResult<OrderedRows<String, String, String>> result, Function<Row<String, String, String>, T> converter) throws MetadataRepositoryException
+    private <T> Spliterator<T> createResultSpliterator( QueryResult<OrderedRows<String, String, String>> result, BiFunction<Row<String, String, String>, T, T> converter) throws MetadataRepositoryException
     {
         final int size = result.get().getCount();
+        final Iterator<Row<String, String, String>> it = result.get( ).iterator( );
+
         return new Spliterator<T>( )
         {
+            private T lastItem = null;
+
             @Override
             public boolean tryAdvance( Consumer<? super T> action )
             {
                 if (size>=1)
                 {
-                    for ( Row<String, String, String> row : result.get( ) )
+                    if(it.hasNext())
                     {
-                        T item = converter.apply( row );
-                        if ( item != null )
+                        while ( it.hasNext( ) )
                         {
-                            action.accept( item );
-                            return true;
+                            Row<String, String, String> row = it.next( );
+                            T item = converter.apply( row, lastItem );
+                            if ( item != null && lastItem !=null && item != lastItem )
+                            {
+                                action.accept( lastItem );
+                                lastItem = item;
+                                return true;
+                            }
+                            lastItem = item;
                         }
+                        action.accept( lastItem );
+                        return true;
+                    } else {
+                        return false;
                     }
-
                 }
                 return false;
             }
@@ -1570,6 +1584,20 @@ public class CassandraMetadataRepository
         };
     }
 
+
+    /**
+     * Implementation is not very performant, because sorting is part of the stream. I do not know how to specify the sort
+     * in the query.
+     * 
+     * @param session
+     * @param repositoryId
+     * @param facetClazz
+     * @param offset
+     * @param maxEntries
+     * @param <T>
+     * @return
+     * @throws MetadataRepositoryException
+     */
     @Override
     public <T extends MetadataFacet> Stream<T> getMetadataFacetStream( RepositorySession session, String repositoryId, Class<T> facetClazz, long offset, long maxEntries ) throws MetadataRepositoryException
     {
@@ -1578,21 +1606,37 @@ public class CassandraMetadataRepository
 
         QueryResult<OrderedRows<String, String, String>> result = HFactory //
             .createRangeSlicesQuery( keyspace, ss, ss, ss ) //
-            .setColumnFamily( cassandraArchivaManager.getMetadataFacetFamilyName() ) //
-            .setColumnNames( NAME.toString() ) //
-            .addEqualsExpression( REPOSITORY_NAME.toString(), repositoryId ) //
-            .addEqualsExpression( FACET_ID.toString(), facetId ) //
-            .execute();
+            .setColumnFamily( cassandraArchivaManager.getMetadataFacetFamilyName( ) ) //
+            .setColumnNames( NAME.toString( ), KEY.toString( ), VALUE.toString( ) ) //
+            .addEqualsExpression( REPOSITORY_NAME.toString( ), repositoryId ) //
+            .addEqualsExpression( FACET_ID.toString( ), facetId ) //
+            .setRange( null, null, false, Integer.MAX_VALUE )
+            .setRowCount( Integer.MAX_VALUE )
+            .execute( );
+
 
-        return StreamSupport.stream( createResultSpliterator( result, ( Row<String, String, String> row)-> {
+
+        return StreamSupport.stream( createResultSpliterator( result, ( Row<String, String, String> row, T lastItem)-> {
             ColumnSlice<String, String> columnSlice = row.getColumnSlice();
             String name = getStringValue( columnSlice, NAME.toString( ) );
-            T metadataFacet = metadataFacetFactory.createMetadataFacet( repositoryId, name );
-            Map<String, String> map = new HashMap<>( );
-            map.put( getStringValue( columnSlice, KEY.toString() ), getStringValue( columnSlice, VALUE.toString() ) );
-            metadataFacet.fromProperties( map );
-            return metadataFacet;
-        }), false );
+            T updateItem;
+            if (lastItem!=null && lastItem.getName().equals(name))
+            {
+                updateItem = lastItem;
+            } else
+            {
+                updateItem = metadataFacetFactory.createMetadataFacet( repositoryId, name );
+            }
+            String key = getStringValue( columnSlice, KEY.toString() );
+            if (StringUtils.isNotEmpty( key ))
+            {
+                Map<String, String> map = new HashMap<>( );
+                map.put( key , getStringValue( columnSlice, VALUE.toString( ) ) );
+                updateItem.fromProperties( map );
+            }
+            return updateItem;
+
+        }), false ).sorted( (f1, f2) -> f1.getName()!=null ? f1.getName().compareTo( f2.getName() ) : 1 ).skip( offset ).limit( maxEntries );
     }
 
     @Override
@@ -1603,11 +1647,14 @@ public class CassandraMetadataRepository
     }
 
     @Override
-    public MetadataFacet getMetadataFacet( RepositorySession session, final String repositoryId, final String facetId, final String name )
+    public <T extends MetadataFacet> T getMetadataFacet( RepositorySession session, final String repositoryId, final Class<T> facetClazz, final String name )
         throws MetadataRepositoryException
     {
-
-        MetadataFacetFactory metadataFacetFactory = getFacetFactory( facetId );
+        final MetadataFacetFactory<T> metadataFacetFactory = getFacetFactory( facetClazz );
+        if (metadataFacetFactory==null) {
+            return null;
+        }
+        final String facetId = metadataFacetFactory.getFacetId( );
         if ( metadataFacetFactory == null )
         {
             return null;
@@ -1622,7 +1669,7 @@ public class CassandraMetadataRepository
             .addEqualsExpression( NAME.toString(), name ) //
             .execute();
 
-        MetadataFacet metadataFacet = metadataFacetFactory.createMetadataFacet( repositoryId, name );
+        T metadataFacet = metadataFacetFactory.createMetadataFacet( repositoryId, name );
         int size = result.get().getCount();
         if ( size < 1 )
         {
@@ -1639,9 +1686,9 @@ public class CassandraMetadataRepository
     }
 
     @Override
-    public <T extends MetadataFacet> T getMetadataFacet( RepositorySession session, String repositoryId, Class<T> clazz, String name ) throws MetadataRepositoryException
+    public MetadataFacet getMetadataFacet( RepositorySession session, String repositoryId, String facetId, String name ) throws MetadataRepositoryException
     {
-        return null;
+        return getMetadataFacet( session, repositoryId, getFactoryClassForId( facetId ), name );
     }
 
     @Override