You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2013/07/21 02:45:29 UTC

svn commit: r1505247 - in /gora/trunk: gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java gora-tutorial/conf/gora.properties

Author: lewismc
Date: Sun Jul 21 00:45:29 2013
New Revision: 1505247

URL: http://svn.apache.org/r1505247
Log:
reformat SolrStore for correct 2 line indents

Modified:
    gora/trunk/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java
    gora/trunk/gora-tutorial/conf/gora.properties

Modified: gora/trunk/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java?rev=1505247&r1=1505246&r2=1505247&view=diff
==============================================================================
--- gora/trunk/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java (original)
+++ gora/trunk/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java Sun Jul 21 00:45:29 2013
@@ -52,475 +52,455 @@ import org.jdom.input.SAXBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SolrStore<K, T extends PersistentBase>
-    extends DataStoreBase<K, T> {
+public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T> {
     
-    private static final Logger LOG = LoggerFactory.getLogger( SolrStore.class );
+  private static final Logger LOG = LoggerFactory.getLogger( SolrStore.class );
 
-    protected static final String DEFAULT_MAPPING_FILE = "gora-solr-mapping.xml";
+  protected static final String DEFAULT_MAPPING_FILE = "gora-solr-mapping.xml";
 
-    protected static final String SOLR_URL_PROPERTY = "solr.url";
+  protected static final String SOLR_URL_PROPERTY = "solr.url";
 
-    protected static final String SOLR_CONFIG_PROPERTY = "solr.config";
+  protected static final String SOLR_CONFIG_PROPERTY = "solr.config";
 
-    protected static final String SOLR_SCHEMA_PROPERTY = "solr.schema";
+  protected static final String SOLR_SCHEMA_PROPERTY = "solr.schema";
     
-    protected static final String SOLR_BATCH_SIZE_PROPERTY = "solr.batchSize";
-
-    protected static final String SOLR_COMMIT_WITHIN_PROPERTY = "solr.commitWithin";
-
-    protected static final String SOLR_RESULTS_SIZE_PROPERTY = "solr.resultsSize";
+  protected static final String SOLR_BATCH_SIZE_PROPERTY = "solr.batchSize";
     
-    protected static final int DEFAULT_BATCH_SIZE = 100;
-
-    protected static final int DEFAULT_COMMIT_WITHIN = 1000;
-
-    protected static final int DEFAULT_RESULTS_SIZE = 100;
-
-    private SolrMapping mapping;
-
-    private String solrServerUrl, solrConfig, solrSchema;
-
-    private SolrServer server, adminServer;
+  //protected static final String SOLR_SOLRJSERVER_IMPL = "solr.solrjserver";
 
-    private ArrayList<SolrInputDocument> batch;
-
-    private int batchSize = DEFAULT_BATCH_SIZE;
-
-    private int commitWithin = DEFAULT_COMMIT_WITHIN;
-
-    private int resultsSize = DEFAULT_RESULTS_SIZE;
-
-    @Override
-    public void initialize( Class<K> keyClass, Class<T> persistentClass, Properties properties ) {
-        super.initialize( keyClass, persistentClass, properties );
-        String mappingFile = DataStoreFactory.getMappingFile( properties, this, DEFAULT_MAPPING_FILE );
-        try {
-            mapping = readMapping( mappingFile );
-        }
-        catch ( IOException e ) {
-            LOG.error( e.getMessage() );
-            LOG.error( e.getStackTrace().toString() );
-        }
+  protected static final String SOLR_COMMIT_WITHIN_PROPERTY = "solr.commitWithin";
 
-        solrServerUrl = DataStoreFactory.findProperty( properties, this, SOLR_URL_PROPERTY, null );
-        solrConfig = DataStoreFactory.findProperty( properties, this, SOLR_CONFIG_PROPERTY, null );
-        solrSchema = DataStoreFactory.findProperty( properties, this, SOLR_SCHEMA_PROPERTY, null );
-        LOG.info( "Using Solr server at " + solrServerUrl );
-        adminServer = new HttpSolrServer( solrServerUrl );
-        server = new HttpSolrServer( solrServerUrl + "/" + mapping.getCoreName() );
-        if ( autoCreateSchema ) {
-            createSchema();
-        }
-        String batchSizeString = DataStoreFactory.findProperty( properties, this, SOLR_BATCH_SIZE_PROPERTY, null );
-        if ( batchSizeString != null ) {
-            try {
-                batchSize = Integer.parseInt( batchSizeString );
-            }
-            catch ( NumberFormatException nfe ) {
-                LOG.warn( "Invalid batch size '" + batchSizeString + "', using default " + DEFAULT_BATCH_SIZE );
-            }
-        }
-        batch = new ArrayList<SolrInputDocument>( batchSize );
-        String commitWithinString = DataStoreFactory.findProperty( properties, this, SOLR_COMMIT_WITHIN_PROPERTY, null );
-        if ( commitWithinString != null ) {
-            try {
-                commitWithin = Integer.parseInt( commitWithinString );
-            }
-            catch ( NumberFormatException nfe ) {
-                LOG.warn( "Invalid commit within '" + commitWithinString + "', using default " + DEFAULT_COMMIT_WITHIN );
-            }
-        }
-        String resultsSizeString = DataStoreFactory.findProperty( properties, this, SOLR_RESULTS_SIZE_PROPERTY, null );
-        if ( resultsSizeString != null ) {
-            try {
-                resultsSize = Integer.parseInt( resultsSizeString );
-            }
-            catch ( NumberFormatException nfe ) {
-                LOG.warn( "Invalid results size '" + resultsSizeString + "', using default " + DEFAULT_RESULTS_SIZE );
-            }
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private SolrMapping readMapping( String filename )
-        throws IOException {
-        SolrMapping map = new SolrMapping();
-        try {
-            SAXBuilder builder = new SAXBuilder();
-            Document doc = builder.build( getClass().getClassLoader().getResourceAsStream( filename ) );
-
-            List<Element> classes = doc.getRootElement().getChildren( "class" );
-
-            for ( Element classElement : classes ) {
-                if ( classElement.getAttributeValue( "keyClass" ).equals( keyClass.getCanonicalName() )
-                    && classElement.getAttributeValue( "name" ).equals( persistentClass.getCanonicalName() ) ) {
-
-                    String tableName = getSchemaName( classElement.getAttributeValue( "table" ), persistentClass );
-                    map.setCoreName( tableName );
-
-                    Element primaryKeyEl = classElement.getChild( "primarykey" );
-                    map.setPrimaryKey( primaryKeyEl.getAttributeValue( "column" ) );
-
-                    List<Element> fields = classElement.getChildren( "field" );
-
-                    for ( Element field : fields ) {
-                        String fieldName = field.getAttributeValue( "name" );
-                        String columnName = field.getAttributeValue( "column" );
-                        map.addField( fieldName, columnName );
-                    }
-                    break;
-                }
-            }
-        }
-        catch ( Exception ex ) {
-            throw new IOException( ex );
-        }
-
-        return map;
-    }
-
-    public SolrMapping getMapping() {
-        return mapping;
-    }
-
-    @Override
-    public String getSchemaName() {
-        return mapping.getCoreName();
-    }
-
-    @Override
-    public void createSchema() {
-        try {
-            if ( !schemaExists() )
-                CoreAdminRequest.createCore( mapping.getCoreName(), mapping.getCoreName(), adminServer, solrConfig,
-                                             solrSchema );
-        }
-        catch ( Exception e ) {
-            LOG.error( e.getMessage(), e.getStackTrace().toString() );
-        }
-    }
+  protected static final String SOLR_RESULTS_SIZE_PROPERTY = "solr.resultsSize";
+    
+  protected static final int DEFAULT_BATCH_SIZE = 100;
 
-    @Override
-    /** Default implementation deletes and recreates the schema*/
-    public void truncateSchema() {
-        try {
-            server.deleteByQuery( "*:*" );
-            server.commit();
-        }
-        catch ( Exception e ) {
-            // ignore?
-            LOG.error( e.getMessage(), e.getStackTrace().toString() );
-        }
-    }
+  protected static final int DEFAULT_COMMIT_WITHIN = 1000;
 
-    @Override
-    public void deleteSchema() {
-        // XXX should this be only in truncateSchema ???
-        try {
-            server.deleteByQuery( "*:*" );
-            server.commit();
-        }
-        catch ( Exception e ) {
-            // ignore?
-            // LOG.error(e.getMessage());
-            // LOG.error(e.getStackTrace().toString());
-        }
-        try {
-            CoreAdminRequest.unloadCore( mapping.getCoreName(), adminServer );
-        }
-        catch ( Exception e ) {
-            if ( e.getMessage().contains( "No such core" ) ) {
-                return; // it's ok, the core is not there
-            }
-            else {
-                LOG.error( e.getMessage(), e.getStackTrace().toString() );
-            }
-        }
-    }
+  protected static final int DEFAULT_RESULTS_SIZE = 100;
 
-    @Override
-    public boolean schemaExists() {
-        boolean exists = false;
-        try {
-            CoreAdminResponse rsp = CoreAdminRequest.getStatus( mapping.getCoreName(), adminServer );
-            exists = rsp.getUptime( mapping.getCoreName() ) != null;
-        }
-        catch ( Exception e ) {
-            LOG.error( e.getMessage(), e.getStackTrace().toString() );
-        }
-        return exists;
-    }
+  private SolrMapping mapping;
 
-    private static final String toDelimitedString( String[] arr, String sep ) {
-        if ( arr == null || arr.length == 0 ) {
-            return "";
-        }
-        StringBuilder sb = new StringBuilder();
-        for ( int i = 0; i < arr.length; i++ ) {
-            if ( i > 0 )
-                sb.append( sep );
-            sb.append( arr[i] );
-        }
-        return sb.toString();
-    }
-
-    public static String escapeQueryKey( String key ) {
-        if ( key == null ) {
-            return null;
-        }
-        StringBuilder sb = new StringBuilder();
-        for ( int i = 0; i < key.length(); i++ ) {
-            char c = key.charAt( i );
-            switch ( c ) {
-                case ':':
-                case '*':
-                    sb.append( "\\" + c );
-                    break;
-                default:
-                    sb.append( c );
-            }
-        }
-        return sb.toString();
-    }
-
-    @Override
-    public T get( K key, String[] fields ) {
-        ModifiableSolrParams params = new ModifiableSolrParams();
-        params.set( CommonParams.QT, "/get" );
-        params.set( CommonParams.FL, toDelimitedString( fields, "," ) );
-        params.set( "id",  key.toString() );
-        try {
-            QueryResponse rsp = server.query( params );
-            Object o = rsp.getResponse().get( "doc" );
-            if ( o == null ) {
-                return null;
-            }
-            return newInstance( (SolrDocument)o, fields );
-        }
-        catch ( Exception e ) {
-            LOG.error( e.getMessage(), e.getStackTrace().toString() );
-        }
+  private String solrServerUrl, solrConfig, solrSchema;
+
+  private SolrServer server, adminServer;
+
+  private ArrayList<SolrInputDocument> batch;
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+
+  private int commitWithin = DEFAULT_COMMIT_WITHIN;
+
+  private int resultsSize = DEFAULT_RESULTS_SIZE;
+
+  @Override
+  public void initialize( Class<K> keyClass, Class<T> persistentClass, Properties properties ) {
+    super.initialize( keyClass, persistentClass, properties );
+    String mappingFile = DataStoreFactory.getMappingFile( properties, this, DEFAULT_MAPPING_FILE );
+    try {
+      mapping = readMapping( mappingFile );
+    }
+    catch ( IOException e ) {
+      LOG.error( e.getMessage() );
+      LOG.error( e.getStackTrace().toString() );
+    }
+
+    solrServerUrl = DataStoreFactory.findProperty( properties, this, SOLR_URL_PROPERTY, null );
+    solrConfig = DataStoreFactory.findProperty( properties, this, SOLR_CONFIG_PROPERTY, null );
+    solrSchema = DataStoreFactory.findProperty( properties, this, SOLR_SCHEMA_PROPERTY, null );
+    LOG.info( "Using Solr server at " + solrServerUrl );
+    adminServer = new HttpSolrServer( solrServerUrl );
+    server = new HttpSolrServer( solrServerUrl + "/" + mapping.getCoreName() );
+    if ( autoCreateSchema ) {
+      createSchema();
+    }
+    String batchSizeString = DataStoreFactory.findProperty( properties, this, SOLR_BATCH_SIZE_PROPERTY, null );
+    if ( batchSizeString != null ) {
+      try {
+        batchSize = Integer.parseInt( batchSizeString );
+      } catch ( NumberFormatException nfe ) {
+        LOG.warn( "Invalid batch size '" + batchSizeString + "', using default " + DEFAULT_BATCH_SIZE );
+      }
+    }
+    batch = new ArrayList<SolrInputDocument>( batchSize );
+    String commitWithinString = DataStoreFactory.findProperty( properties, this, SOLR_COMMIT_WITHIN_PROPERTY, null );
+    if ( commitWithinString != null ) {
+      try {
+        commitWithin = Integer.parseInt( commitWithinString );
+      } catch ( NumberFormatException nfe ) {
+        LOG.warn( "Invalid commit within '" + commitWithinString + "', using default " + DEFAULT_COMMIT_WITHIN );
+      }
+    }
+    String resultsSizeString = DataStoreFactory.findProperty( properties, this, SOLR_RESULTS_SIZE_PROPERTY, null );
+    if ( resultsSizeString != null ) {
+      try {
+        resultsSize = Integer.parseInt( resultsSizeString );
+      } catch ( NumberFormatException nfe ) {
+        LOG.warn( "Invalid results size '" + resultsSizeString + "', using default " + DEFAULT_RESULTS_SIZE );
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private SolrMapping readMapping( String filename ) throws IOException {
+    SolrMapping map = new SolrMapping();
+    try {
+      SAXBuilder builder = new SAXBuilder();
+      Document doc = builder.build( getClass().getClassLoader().getResourceAsStream( filename ) );
+
+      List<Element> classes = doc.getRootElement().getChildren( "class" );
+
+      for ( Element classElement : classes ) {
+        if ( classElement.getAttributeValue( "keyClass" ).equals( keyClass.getCanonicalName() )
+            && classElement.getAttributeValue( "name" ).equals( persistentClass.getCanonicalName() ) ) {
+
+          String tableName = getSchemaName( classElement.getAttributeValue( "table" ), persistentClass );
+          map.setCoreName( tableName );
+
+          Element primaryKeyEl = classElement.getChild( "primarykey" );
+          map.setPrimaryKey( primaryKeyEl.getAttributeValue( "column" ) );
+
+          List<Element> fields = classElement.getChildren( "field" );
+
+          for ( Element field : fields ) {
+            String fieldName = field.getAttributeValue( "name" );
+            String columnName = field.getAttributeValue( "column" );
+            map.addField( fieldName, columnName );
+          }
+          break;
+        }
+      }
+    } catch ( Exception ex ) {
+      throw new IOException( ex );
+    }
+
+    return map;
+  }
+
+  public SolrMapping getMapping() {
+    return mapping;
+  }
+
+  @Override
+  public String getSchemaName() {
+    return mapping.getCoreName();
+  }
+
+  @Override
+  public void createSchema() {
+    try {
+      if ( !schemaExists() )
+          CoreAdminRequest.createCore( mapping.getCoreName(), mapping.getCoreName(), adminServer, solrConfig,
+          solrSchema );
+    } catch ( Exception e ) {
+      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+    }
+  }
+
+  @Override
+  /** Default implementation deletes and recreates the schema*/
+  public void truncateSchema() {
+    try {
+      server.deleteByQuery( "*:*" );
+      server.commit();
+    } catch ( Exception e ) {
+      // ignore?
+      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+    }
+  }
+
+  @Override
+  public void deleteSchema() {
+    // XXX should this be only in truncateSchema ???
+    try {
+      server.deleteByQuery( "*:*" );
+      server.commit();
+    } catch ( Exception e ) {
+    // ignore?
+    // LOG.error(e.getMessage());
+    // LOG.error(e.getStackTrace().toString());
+    }
+    try {
+      CoreAdminRequest.unloadCore( mapping.getCoreName(), adminServer );
+    } catch ( Exception e ) {
+      if ( e.getMessage().contains( "No such core" ) ) {
+        return; // it's ok, the core is not there
+      } else {
+        LOG.error( e.getMessage(), e.getStackTrace().toString() );
+      }
+    }
+  }
+
+  @Override
+  public boolean schemaExists() {
+    boolean exists = false;
+    try {
+      CoreAdminResponse rsp = CoreAdminRequest.getStatus( mapping.getCoreName(), adminServer );
+      exists = rsp.getUptime( mapping.getCoreName() ) != null;
+    } catch ( Exception e ) {
+      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+    }
+    return exists;
+  }
+
+  private static final String toDelimitedString( String[] arr, String sep ) {
+    if ( arr == null || arr.length == 0 ) {
+      return "";
+    }
+    StringBuilder sb = new StringBuilder();
+    for ( int i = 0; i < arr.length; i++ ) {
+      if ( i > 0 )
+        sb.append( sep );
+        sb.append( arr[i] );
+    }
+    return sb.toString();
+  }
+
+  public static String escapeQueryKey( String key ) {
+    if ( key == null ) {
+      return null;
+    }
+    StringBuilder sb = new StringBuilder();
+    for ( int i = 0; i < key.length(); i++ ) {
+      char c = key.charAt( i );
+      switch ( c ) {
+        case ':':
+        case '*':
+          sb.append( "\\" + c );
+          break;
+        default:
+        sb.append( c );
+      }
+    }
+    return sb.toString();
+  }
+
+  @Override
+  public T get( K key, String[] fields ) {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set( CommonParams.QT, "/get" );
+    params.set( CommonParams.FL, toDelimitedString( fields, "," ) );
+    params.set( "id",  key.toString() );
+    try {
+      QueryResponse rsp = server.query( params );
+      Object o = rsp.getResponse().get( "doc" );
+      if ( o == null ) {
         return null;
-    }
-
-    public T newInstance( SolrDocument doc, String[] fields )
-        throws IOException {
-        T persistent = newPersistent();
-        if ( fields == null ) {
-            fields = fieldMap.keySet().toArray( new String[fieldMap.size()] );
-        }
-        String pk = mapping.getPrimaryKey();
-        for ( String f : fields ) {
-            Field field = fieldMap.get( f );
-            Schema fieldSchema = field.schema();
-            String sf = null;
-            if ( pk.equals( f ) ) {
-                sf = f;
-            }
-            else {
-                sf = mapping.getSolrField( f );                
-            }
-            Object sv = doc.get( sf );
-            Object v;
-            if ( sv == null ) {
-                continue;
-            }
-            switch ( fieldSchema.getType() ) {
-                case MAP:
-                case ARRAY:
-                case RECORD:
-                    v = IOUtils.deserialize( (byte[]) sv, datumReader, fieldSchema, persistent.get( field.pos() ) );
-                    persistent.put( field.pos(), v );
-                    break;
-                case ENUM:
-                    v = AvroUtils.getEnumValue( fieldSchema, (String) sv );
-                    persistent.put( field.pos(), v );
-                    break;
-                case FIXED:
-                    throw new IOException( "???" );
-                    // break;
-                case BYTES:
-                    persistent.put( field.pos(), ByteBuffer.wrap( (byte[]) sv ) );
-                    break;
-                case BOOLEAN:
-                case DOUBLE:
-                case FLOAT:
-                case INT:
-                case LONG:
-                    persistent.put( field.pos(), sv );
-                    break;
-                case STRING:
-                    persistent.put( field.pos(), new Utf8( sv.toString() ) );
-                    break;
-                case UNION:
-                    LOG.error( "Union is not supported yet" );
-                    break;
-                default:
-                    LOG.error( "Unknown field type: " + fieldSchema.getType() );
-            }
-            persistent.setDirty( field.pos() );
-        }
-        persistent.clearDirty();
-        return persistent;
-    }
-
-    @Override
-    public void put( K key, T persistent ) {
-        Schema schema = persistent.getSchema();
-        StateManager stateManager = persistent.getStateManager();
-        if ( !stateManager.isDirty( persistent ) ) {
-            // nothing to do
-            return;
-        }
-        SolrInputDocument doc = new SolrInputDocument();
-        // add primary key
-        doc.addField( mapping.getPrimaryKey(), key );
-        // populate the doc
-        List<Field> fields = schema.getFields();
-        for ( Field field : fields ) {
-            String sf = mapping.getSolrField( field.name() );
-            // Solr will append values to fields in a SolrInputDocument, even the key
-            // mapping won't find the primary
-            if ( sf == null ) {
-                continue;
-            }
-            Schema fieldSchema = field.schema();
-            Object v = persistent.get( field.pos() );
-            if ( v == null ) {
-                continue;
-            }
-            switch ( fieldSchema.getType() ) {
-                case MAP:
-                case ARRAY:
-                case RECORD:
-                    byte[] data = null;
-                    try {
-                        data = IOUtils.serialize( datumWriter, fieldSchema, v );
-                    }
-                    catch ( IOException e ) {
-                        LOG.error( e.getMessage(), e.getStackTrace().toString() );
-                    }
-                    doc.addField( sf, data );
-                    break;
-                case BYTES:
-                    doc.addField( sf, ( (ByteBuffer) v ).array() );
-                    break;
-                case ENUM:
-                case STRING:
-                    doc.addField( sf, v.toString() );
-                    break;
-                case BOOLEAN:
-                case DOUBLE:
-                case FLOAT:
-                case INT:
-                case LONG:
-                    doc.addField( sf, v );
-                    break;
-                case UNION:
-                    LOG.error( "Union is not supported yet" );
-                    break;
-                default:
-                    LOG.error( "Unknown field type: " + fieldSchema.getType() );
-            }
-        }
-        System.out.println( "DOCUMENT: " + doc );
-        batch.add( doc );
-        if ( batch.size() >= batchSize ) {
-            try {
-                add( batch, commitWithin );
-                batch.clear();
-            }
-            catch ( Exception e ) {
-                LOG.error( e.getMessage(), e.getStackTrace().toString() );
-            }
-        }
-    }
-
-    @Override
-    public boolean delete( K key ) {
-        String keyField = mapping.getPrimaryKey();
-        try {
-            UpdateResponse rsp = server.deleteByQuery( keyField + ":" + escapeQueryKey( key.toString() ) );
-            server.commit();
-            LOG.info( rsp.toString() );
-            return true;
-        }
-        catch ( Exception e ) {
-            LOG.error( e.getMessage(), e.getStackTrace().toString() );
-        }
-        return false;
-    }
-
-    @Override
-    public long deleteByQuery( Query<K, T> query ) {
-        String q = ( (SolrQuery<K, T>) query ).toSolrQuery();
-        try {
-            UpdateResponse rsp = server.deleteByQuery( q );
-            server.commit();
-            LOG.info( rsp.toString() );
-        }
-        catch ( Exception e ) {
-            LOG.error( e.getMessage(), e.getStackTrace().toString() );
-        }
-        return 0;
-    }
-
-    @Override
-    public Result<K, T> execute( Query<K, T> query ) {
-        try {
-            return new SolrResult<K, T>( this, query, server, resultsSize );
-        }
-        catch ( IOException e ) {
+      }
+      return newInstance( (SolrDocument)o, fields );
+    } catch ( Exception e ) {
+      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+    }
+    return null;
+  }
+
+  public T newInstance( SolrDocument doc, String[] fields )
+      throws IOException {
+    T persistent = newPersistent();
+    if ( fields == null ) {
+      fields = fieldMap.keySet().toArray( new String[fieldMap.size()] );
+    }
+    String pk = mapping.getPrimaryKey();
+    for ( String f : fields ) {
+      Field field = fieldMap.get( f );
+      Schema fieldSchema = field.schema();
+      String sf = null;
+      if ( pk.equals( f ) ) {
+        sf = f;
+      } else {
+        sf = mapping.getSolrField( f );                
+      }
+      Object sv = doc.get( sf );
+      Object v;
+      if ( sv == null ) {
+        continue;
+      }
+      switch ( fieldSchema.getType() ) {
+        case MAP:
+        case ARRAY:
+        case RECORD:
+          v = IOUtils.deserialize( (byte[]) sv, datumReader, fieldSchema, persistent.get( field.pos() ) );
+          persistent.put( field.pos(), v );
+          break;
+        case ENUM:
+          v = AvroUtils.getEnumValue( fieldSchema, (String) sv );
+          persistent.put( field.pos(), v );
+          break;
+        case FIXED:
+          throw new IOException( "???" );
+          // break;
+        case BYTES:
+          persistent.put( field.pos(), ByteBuffer.wrap( (byte[]) sv ) );
+          break;
+        case BOOLEAN:
+        case DOUBLE:
+        case FLOAT:
+        case INT:
+        case LONG:
+          persistent.put( field.pos(), sv );
+          break;
+        case STRING:
+          persistent.put( field.pos(), new Utf8( sv.toString() ) );
+          break;
+        case UNION:
+          LOG.error( "Union is not supported yet" );
+          break;
+        default:
+          LOG.error( "Unknown field type: " + fieldSchema.getType() );
+      }
+      persistent.setDirty( field.pos() );
+    }
+    persistent.clearDirty();
+    return persistent;
+  }
+
+  @Override
+  public void put( K key, T persistent ) {
+    Schema schema = persistent.getSchema();
+    StateManager stateManager = persistent.getStateManager();
+    if ( !stateManager.isDirty( persistent ) ) {
+      // nothing to do
+      return;
+    }
+    SolrInputDocument doc = new SolrInputDocument();
+    // add primary key
+    doc.addField( mapping.getPrimaryKey(), key );
+    // populate the doc
+    List<Field> fields = schema.getFields();
+    for ( Field field : fields ) {
+      String sf = mapping.getSolrField( field.name() );
+      // Solr will append values to fields in a SolrInputDocument, even the key
+      // mapping won't find the primary
+      if ( sf == null ) {
+        continue;
+      }
+      Schema fieldSchema = field.schema();
+      Object v = persistent.get( field.pos() );
+      if ( v == null ) {
+        continue;
+      }
+      switch ( fieldSchema.getType() ) {
+        case MAP:
+        case ARRAY:
+        case RECORD:
+          byte[] data = null;
+          try {
+            data = IOUtils.serialize( datumWriter, fieldSchema, v );
+          } catch ( IOException e ) {
             LOG.error( e.getMessage(), e.getStackTrace().toString() );
-        }
-        return null;
-    }
-
-    @Override
-    public Query<K, T> newQuery() {
-        return new SolrQuery<K, T>( this );
-    }
-
-    @Override
-    public List<PartitionQuery<K, T>> getPartitions( Query<K, T> query )
-        throws IOException {
-        // TODO: implement this using Hadoop DB support
-
-        ArrayList<PartitionQuery<K, T>> partitions = new ArrayList<PartitionQuery<K, T>>();
-        partitions.add( new PartitionQueryImpl<K, T>( query ) );
-
-        return partitions;
-    }
-
-    @Override
-    public void flush() {
-        try {
-            if ( batch.size() > 0 ) {
-                add( batch, commitWithin );
-                batch.clear();
-            }
-        }
-        catch ( Exception e ) {
-            LOG.error(e.getMessage(), e.getStackTrace());
-        }
-    }
-
-    @Override
-    public void close() {
-        // In testing, the index gets closed before the commit in flush() can happen
-        // so an exception gets thrown
-        //flush();
-    }
+          }
+          doc.addField( sf, data );
+          break;
+        case BYTES:
+          doc.addField( sf, ( (ByteBuffer) v ).array() );
+          break;
+        case ENUM:
+        case STRING:
+          doc.addField( sf, v.toString() );
+          break;
+        case BOOLEAN:
+        case DOUBLE:
+        case FLOAT:
+        case INT:
+        case LONG:
+          doc.addField( sf, v );
+          break;
+        case UNION:
+          LOG.error( "Union is not supported yet" );
+          break;
+        default:
+          LOG.error( "Unknown field type: " + fieldSchema.getType() );
+      }
+    }
+    LOG.info( "DOCUMENT: " + doc );
+    batch.add( doc );
+    if ( batch.size() >= batchSize ) {
+      try {
+        add( batch, commitWithin );
+        batch.clear();
+      } catch ( Exception e ) {
+      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+      }
+    }
+  }
+
+  @Override
+  public boolean delete( K key ) {
+    String keyField = mapping.getPrimaryKey();
+    try {
+      UpdateResponse rsp = server.deleteByQuery( keyField + ":" + escapeQueryKey( key.toString() ) );
+      server.commit();
+      LOG.info( rsp.toString() );
+      return true;
+    } catch ( Exception e ) {
+      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+    }
+    return false;
+  }
+
+  @Override
+  public long deleteByQuery( Query<K, T> query ) {
+    String q = ( (SolrQuery<K, T>) query ).toSolrQuery();
+    try {
+      UpdateResponse rsp = server.deleteByQuery( q );
+      server.commit();
+      LOG.info( rsp.toString() );
+    } catch ( Exception e ) {
+      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+    }
+    return 0;
+  }
+
+  @Override
+  public Result<K, T> execute( Query<K, T> query ) {
+    try {
+      return new SolrResult<K, T>( this, query, server, resultsSize );
+    } catch ( IOException e ) {
+      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+    }
+    return null;
+  }
+
+  @Override
+  public Query<K, T> newQuery() {
+    return new SolrQuery<K, T>( this );
+  }
+
+  @Override
+  public List<PartitionQuery<K, T>> getPartitions( Query<K, T> query )
+      throws IOException {
+    // TODO: implement this using Hadoop DB support
+
+    ArrayList<PartitionQuery<K, T>> partitions = new ArrayList<PartitionQuery<K, T>>();
+    partitions.add( new PartitionQueryImpl<K, T>( query ) );
+
+    return partitions;
+  }
+
+  @Override
+  public void flush() {
+    try {
+      if ( batch.size() > 0 ) {
+        add( batch, commitWithin );
+        batch.clear();
+      }
+    } catch ( Exception e ) {
+      LOG.error(e.getMessage(), e.getStackTrace());
+    }
+  }
+
+  @Override
+  public void close() {
+    // In testing, the index gets closed before the commit in flush() can happen
+    // so an exception gets thrown
+    //flush();
+  }
     
-    private void add(ArrayList<SolrInputDocument> batch, int commitWithin) throws SolrServerException, IOException {
-        if (commitWithin == 0) {
-            server.add( batch );
-            server.commit( false, true, true );
-        }
-        else {
-            server.add( batch, commitWithin );            
-        }
+  private void add(ArrayList<SolrInputDocument> batch, int commitWithin) throws SolrServerException, IOException {
+    if (commitWithin == 0) {
+      server.add( batch );
+      server.commit( false, true, true );
+    } else {
+      server.add( batch, commitWithin );            
     }
-    
+  }  
 }

Modified: gora/trunk/gora-tutorial/conf/gora.properties
URL: http://svn.apache.org/viewvc/gora/trunk/gora-tutorial/conf/gora.properties?rev=1505247&r1=1505246&r2=1505247&view=diff
==============================================================================
--- gora/trunk/gora-tutorial/conf/gora.properties (original)
+++ gora/trunk/gora-tutorial/conf/gora.properties Sun Jul 21 00:45:29 2013
@@ -48,4 +48,7 @@ gora.sqlstore.jdbc.url=jdbc:hsqldb:file:
 gora.solrstore.solr.url=http://localhost:8983/solr
 gora.solrstore.solr.commitwithin=0
 gora.solrstore.solr.batchsize=100
+# set which Solrj server impl you wish to use 
+# cloud, concurrent, http, loadbalance
+#gora.solrstore.solr.solrjserver=http