You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2014/01/15 19:43:51 UTC

svn commit: r1558504 - in /gora/trunk: ./ gora-cassandra/src/main/java/org/apache/gora/cassandra/query/ gora-cassandra/src/main/java/org/apache/gora/cassandra/store/ gora-core/src/main/java/org/apache/gora/mapreduce/

Author: lewismc
Date: Wed Jan 15 18:43:51 2014
New Revision: 1558504

URL: http://svn.apache.org/r1558504
Log:
GORA-283 Specify field name for types not being considered in gora-cassandra

Modified:
    gora/trunk/CHANGES.txt
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java

Modified: gora/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/gora/trunk/CHANGES.txt?rev=1558504&r1=1558503&r2=1558504&view=diff
==============================================================================
--- gora/trunk/CHANGES.txt (original)
+++ gora/trunk/CHANGES.txt Wed Jan 15 18:43:51 2014
@@ -4,6 +4,8 @@
 
 Gora Change Log
 
+* GORA-283 Specify field name for types not being considered in gora-cassandra (lewismc)
+
 * GORA-285 Change logging at o.a.g.mapreduce.GoraRecordWriter from INFO to WARN (lewismc)
 
 * GORA-117 gora hbase does not have a mechanism to set the caching on a scanner, which makes for poor performance on map/reduce jobs (alfonsonishikawa)

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1558504&r1=1558503&r2=1558504&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java Wed Jan 15 18:43:51 2014
@@ -36,7 +36,7 @@ public abstract class CassandraColumn {
 
   public static final int SUB = 0;
   public static final int SUPER = 1;
-  
+
   private String family;
   private int type;
   private Field field;
@@ -49,7 +49,7 @@ public abstract class CassandraColumn {
   public int getUnionType(){
     return unionType;
   }
-  
+
   public String getFamily() {
     return family;
   }
@@ -65,19 +65,20 @@ public abstract class CassandraColumn {
   public void setField(Field field) {
     this.field = field;
   }
-  
+
   protected Field getField() {
     return this.field;
   }
-  
+
   public abstract ByteBuffer getName();
   public abstract Object getValue();
-  
+
   protected Object fromByteBuffer(Schema schema, ByteBuffer byteBuffer) {
     Object value = null;
     Serializer<?> serializer = GoraSerializerTypeInferer.getSerializer(schema);
     if (serializer == null) {
-      LOG.info("Schema is not supported: " + schema.toString());
+      LOG.warn("Schema: " + schema.getName() + " is not supported. No serializer "
+          + "could be found. Please report this to dev@gora.apache.org");
     } else {
       value = serializer.fromByteBuffer(byteBuffer);
     }

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1558504&r1=1558503&r2=1558504&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java Wed Jan 15 18:43:51 2014
@@ -21,7 +21,6 @@ package org.apache.gora.cassandra.query;
 import java.nio.ByteBuffer;
 import java.util.Map;
 
-import me.prettyprint.cassandra.serializers.IntegerSerializer;
 import me.prettyprint.cassandra.serializers.StringSerializer;
 import me.prettyprint.hector.api.beans.HColumn;
 import me.prettyprint.hector.api.beans.HSuperColumn;
@@ -29,7 +28,6 @@ import me.prettyprint.hector.api.beans.H
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericArray;
 import org.apache.avro.util.Utf8;
 import org.apache.gora.cassandra.serializers.Utf8Serializer;
 import org.apache.gora.persistency.ListGenericArray;
@@ -119,7 +117,7 @@ public class CassandraSuperColumn extend
         }
         break;
       default:
-        LOG.info("Type not supported: " + type);
+        LOG.warn("Type: " + type.name() + " not supported for field: " + field.name());
     }
     
     return value;

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1558504&r1=1558503&r2=1558504&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Wed Jan 15 18:43:51 2014
@@ -322,25 +322,24 @@ public class CassandraClient<K, T extend
   }
 
   /**
-   * Serialize value to ByteBuffer.
-   * @param value the member value
+   * Serialize value to ByteBuffer using 
+   * {@link org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer#getSerializer(Object)}.
+   * @param value the member value {@link java.lang.Object}.
    * @return ByteBuffer object
    */
-  @SuppressWarnings("unchecked")
   public ByteBuffer toByteBuffer(Object value) {
     ByteBuffer byteBuffer = null;
-    Serializer serializer = GoraSerializerTypeInferer.getSerializer(value);
+    Serializer<Object> serializer = GoraSerializerTypeInferer.getSerializer(value);
     if (serializer == null) {
-      LOG.info("Serializer not found for: " + value.toString());
+      LOG.warn("Serializer not found for: " + value.toString());
     }
     else {
+      LOG.debug(serializer.getClass() + " selected as appropriate Serializer.");
       byteBuffer = serializer.toByteBuffer(value);
     }
-
     if (byteBuffer == null) {
-      LOG.info("value class=" + value.getClass().getName() + " value=" + value + " -> null");
+      LOG.warn("Serialization value for: " + value.getClass().getName() + " = null");
     }
-    
     return byteBuffer;
   }
 

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1558504&r1=1558503&r2=1558504&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Wed Jan 15 18:43:51 2014
@@ -66,7 +66,7 @@ import org.slf4j.LoggerFactory;
  * such as initialization, creating and deleting schemas (Cassandra Keyspaces), etc.  
  */
 public class CassandraStore<K, T extends PersistentBase> extends DataStoreBase<K, T> {
-  
+
   /** Logging implementation */
   public static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
 
@@ -84,12 +84,12 @@ public class CassandraStore<K, T extends
    * We don't want to lock the entire collection before iterating over the keys, since in the meantime other threads are adding entries to the map.
    */
   private Map<K, T> buffer = Collections.synchronizedMap(new LinkedHashMap<K, T>());
-  
+
   /** The default constructor for CassandraStore */
   public CassandraStore() throws Exception {
     // this.cassandraClient.initialize();
   }
- 
+
   /** 
    * Initialize is called when then the call to 
    * {@link org.apache.gora.store.DataStoreFactory#createDataStore(Class<D> dataStoreClass, Class<K> keyClass, Class<T> persistent, org.apache.hadoop.conf.Configuration conf)}
@@ -144,19 +144,19 @@ public class CassandraStore<K, T extends
    */
   @Override
   public Result<K, T> execute(Query<K, T> query) {
-    
+
     Map<String, List<String>> familyMap = this.cassandraClient.getFamilyMap(query);
     Map<String, String> reverseMap = this.cassandraClient.getReverseMap(query);
-    
+
     CassandraQuery<K, T> cassandraQuery = new CassandraQuery<K, T>();
     cassandraQuery.setQuery(query);
     cassandraQuery.setFamilyMap(familyMap);
-    
+
     CassandraResult<K, T> cassandraResult = new CassandraResult<K, T>(this, query);
     cassandraResult.setReverseMap(reverseMap);
 
     CassandraResultSet<K> cassandraResultSet = new CassandraResultSet<K>();
-    
+
     // We query Cassandra keyspace by families.
     for (String family : familyMap.keySet()) {
       if (family == null) {
@@ -164,17 +164,17 @@ public class CassandraStore<K, T extends
       }
       if (this.cassandraClient.isSuper(family)) {
         addSuperColumns(family, cassandraQuery, cassandraResultSet);
-         
+
       } else {
         addSubColumns(family, cassandraQuery, cassandraResultSet);
       }
     }
-    
+
     cassandraResult.setResultSet(cassandraResultSet);
-    
+
     return cassandraResult;
   }
-  
+
   /**
    * When we add subcolumns, Gora keys are mapped to Cassandra partition keys only. 
    * This is because we follow the Cassandra logic where column family data is 
@@ -184,10 +184,10 @@ public class CassandraStore<K, T extends
       CassandraResultSet cassandraResultSet) {
     // select family columns that are included in the query
     List<Row<K, ByteBuffer, ByteBuffer>> rows = this.cassandraClient.execute(cassandraQuery, family);
-    
+
     for (Row<K, ByteBuffer, ByteBuffer> row : rows) {
       K key = row.getKey();
-      
+
       // find associated row in the resultset
       CassandraRow<K> cassandraRow = cassandraResultSet.getRow(key);
       if (cassandraRow == null) {
@@ -195,16 +195,16 @@ public class CassandraStore<K, T extends
         cassandraResultSet.putRow(key, cassandraRow);
         cassandraRow.setKey(key);
       }
-      
+
       ColumnSlice<ByteBuffer, ByteBuffer> columnSlice = row.getColumnSlice();
-      
+
       for (HColumn<ByteBuffer, ByteBuffer> hColumn : columnSlice.getColumns()) {
         CassandraSubColumn cassandraSubColumn = new CassandraSubColumn();
         cassandraSubColumn.setValue(hColumn);
         cassandraSubColumn.setFamily(family);
         cassandraRow.add(cassandraSubColumn);
       }
-      
+
     }
   }
 
@@ -215,7 +215,7 @@ public class CassandraStore<K, T extends
    */
   private void addSuperColumns(String family, CassandraQuery<K, T> cassandraQuery, 
       CassandraResultSet cassandraResultSet) {
-    
+
     List<SuperRow<K, String, ByteBuffer, ByteBuffer>> superRows = this.cassandraClient.executeSuper(cassandraQuery, family);
     for (SuperRow<K, String, ByteBuffer, ByteBuffer> superRow: superRows) {
       K key = superRow.getKey();
@@ -225,7 +225,7 @@ public class CassandraStore<K, T extends
         cassandraResultSet.putRow(key, cassandraRow);
         cassandraRow.setKey(key);
       }
-      
+
       SuperSlice<String, ByteBuffer, ByteBuffer> superSlice = superRow.getSuperSlice();
       for (HSuperColumn<String, ByteBuffer, ByteBuffer> hSuperColumn: superSlice.getSuperColumns()) {
         CassandraSuperColumn cassandraSuperColumn = new CassandraSuperColumn();
@@ -242,12 +242,12 @@ public class CassandraStore<K, T extends
    */
   @Override
   public void flush() {
-    
+
     Set<K> keys = this.buffer.keySet();
-    
+
     // this duplicates memory footprint
     K[] keyArray = (K[]) keys.toArray();
-    
+
     // iterating over the key set directly would throw ConcurrentModificationException with java.util.HashMap and subclasses
     for (K key: keyArray) {
       T value = this.buffer.get(key);
@@ -262,7 +262,7 @@ public class CassandraStore<K, T extends
         }
       }
     }
-    
+
     // remove flushed rows
     for (K key: keyArray) {
       this.buffer.remove(key);
@@ -297,7 +297,7 @@ public class CassandraStore<K, T extends
     partitions.add(pqi);
     return partitions;
   }
-  
+
   /**
    * In Cassandra Schemas are referred to as Keyspaces
    * @return Keyspace
@@ -326,51 +326,51 @@ public class CassandraStore<K, T extends
       int fieldPos = field.pos();
       if (value.isDirty(fieldPos)) {
         Object fieldValue = value.get(fieldPos);
-        
+
         // check if field has a nested structure (array, map, or record)
         Schema fieldSchema = field.schema();
         Type type = fieldSchema.getType();
         switch(type) {
-          case RECORD:
-            PersistentBase persistent = (PersistentBase) fieldValue;
-            PersistentBase newRecord = (PersistentBase) persistent.newInstance(new StateManagerImpl());
-            for (Field member: fieldSchema.getFields()) {
-              newRecord.put(member.pos(), persistent.get(member.pos()));
-            }
-            fieldValue = newRecord;
-            break;
-          case MAP:
-            StatefulHashMap map = (StatefulHashMap) fieldValue;
-            StatefulHashMap newMap = new StatefulHashMap();
-            for (Object mapKey : map.keySet()) {
-              newMap.put(mapKey, map.get(mapKey));
-              newMap.putState(mapKey, map.getState(mapKey));
-            }
-            fieldValue = newMap;
-            break;
-          case ARRAY:
-            GenericArray array = (GenericArray) fieldValue;
-            ListGenericArray newArray = new ListGenericArray(fieldSchema.getElementType());
-            Iterator iter = array.iterator();
-            while (iter.hasNext()) {
-              newArray.add(iter.next());
-            }
-            fieldValue = newArray;
-            break;
-          case UNION:
-            // storing the union selected schema, the actual value will be stored as soon as getting out of here
-            // TODO determine which schema we are using: int schemaPos = getUnionSchema(fieldValue,fieldSchema);
-            // and save it p.put( p.getFieldIndex(field.name() + CassandraStore.UNION_COL_SUFIX), schemaPos);
-            break;
+        case RECORD:
+          PersistentBase persistent = (PersistentBase) fieldValue;
+          PersistentBase newRecord = (PersistentBase) persistent.newInstance(new StateManagerImpl());
+          for (Field member: fieldSchema.getFields()) {
+            newRecord.put(member.pos(), persistent.get(member.pos()));
+          }
+          fieldValue = newRecord;
+          break;
+        case MAP:
+          StatefulHashMap map = (StatefulHashMap) fieldValue;
+          StatefulHashMap newMap = new StatefulHashMap();
+          for (Object mapKey : map.keySet()) {
+            newMap.put(mapKey, map.get(mapKey));
+            newMap.putState(mapKey, map.getState(mapKey));
+          }
+          fieldValue = newMap;
+          break;
+        case ARRAY:
+          GenericArray array = (GenericArray) fieldValue;
+          ListGenericArray newArray = new ListGenericArray(fieldSchema.getElementType());
+          Iterator iter = array.iterator();
+          while (iter.hasNext()) {
+            newArray.add(iter.next());
+          }
+          fieldValue = newArray;
+          break;
+        case UNION:
+          // storing the union selected schema, the actual value will be stored as soon as getting out of here
+          // TODO determine which schema we are using: int schemaPos = getUnionSchema(fieldValue,fieldSchema);
+          // and save it p.put( p.getFieldIndex(field.name() + CassandraStore.UNION_COL_SUFIX), schemaPos);
+          break;
         }
-        
+
         p.put(fieldPos, fieldValue);
       }
     }
-    
+
     // this performs a structural modification of the map
     this.buffer.put(key, p);
- }
+  }
 
   /**
    * Add a field to Cassandra according to its type.
@@ -381,67 +381,65 @@ public class CassandraStore<K, T extends
   private void addOrUpdateField(K key, Field field, Object value) {
     Schema schema = field.schema();
     Type type = schema.getType();
-      switch (type) {
-        case STRING:
-        case BOOLEAN:
-        case INT:
-        case LONG:
-        case BYTES:
-        case FLOAT:
-        case DOUBLE:
-        case FIXED:
-          this.cassandraClient.addColumn(key, field.name(), value);
-          break;
-        case RECORD:
-          if (value != null) {
-            if (value instanceof PersistentBase) {
-              PersistentBase persistentBase = (PersistentBase) value;
-              for (Field member: schema.getFields()) {
-                
-                // TODO: hack, do not store empty arrays
-                Object memberValue = persistentBase.get(member.pos());
-                if (memberValue instanceof GenericArray<?>) {
-                  if (((GenericArray)memberValue).size() == 0) {
-                    continue;
-                  }
-                }
-                this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
+    switch (type) {
+    case STRING:
+    case BOOLEAN:
+    case INT:
+    case LONG:
+    case BYTES:
+    case FLOAT:
+    case DOUBLE:
+    case FIXED:
+      this.cassandraClient.addColumn(key, field.name(), value);
+      break;
+    case RECORD:
+      if (value != null) {
+        if (value instanceof PersistentBase) {
+          PersistentBase persistentBase = (PersistentBase) value;
+          for (Field member: schema.getFields()) {
+
+            // TODO: hack, do not store empty arrays
+            Object memberValue = persistentBase.get(member.pos());
+            if (memberValue instanceof GenericArray<?>) {
+              if (((GenericArray)memberValue).size() == 0) {
+                continue;
               }
-          } else {
-            LOG.info("Record not supported: " + value.toString());
-            
+            }
+            this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
           }
+        } else {
+          LOG.warn("Record with value: " + value.toString() + " not supported for field: " + field.name()); 
         }
-        break;
-      case MAP:
-        if (value != null) {
-          if (value instanceof StatefulHashMap<?, ?>) {
-            this.cassandraClient.addStatefulHashMap(key, field.name(), (StatefulHashMap<Utf8,Object>)value);
-          } else {
-            LOG.info("Map not supported: " + value.toString());
-          }
+      }
+      break;
+    case MAP:
+      if (value != null) {
+        if (value instanceof StatefulHashMap<?, ?>) {
+          this.cassandraClient.addStatefulHashMap(key, field.name(), (StatefulHashMap<Utf8,Object>)value);
+        } else {
+          LOG.warn("Map with value: " + value.toString() + " not supported for field: " + field.name());
         }
-        break;
-      case ARRAY:
-        if (value != null) {
-          if (value instanceof GenericArray<?>) {
-            this.cassandraClient.addGenericArray(key, field.name(), (GenericArray)value);
-          } else {
-            LOG.info("Array not supported: " + value.toString());
-          }
+      }
+      break;
+    case ARRAY:
+      if (value != null) {
+        if (value instanceof GenericArray<?>) {
+          this.cassandraClient.addGenericArray(key, field.name(), (GenericArray)value);
+        } else {
+          LOG.warn("Array with value: " + value.toString() + " not supported for field: " + field.name());
         }
-        break;
-       case UNION:
-         if(value != null) {
-           LOG.info("Union being supported with value: " + value.toString());
-           // TODO add union schema index used
-           // adding union value
-           this.cassandraClient.addColumn(key, field.name(), value);
-         } else {
-           LOG.info("Union not supported: " + value.toString());
-         }
-      default:
-        LOG.info("Type not considered: " + type.name());      
+      }
+      break;
+    case UNION:
+      if(value != null) {
+        LOG.debug("Union with value: " + value.toString() + " at index: " + getUnionSchema(value, schema) + " supported for field: " + field.name());
+        this.cassandraClient.addColumn(key, field.name(), value);
+      } else {
+        LOG.warn("Union with value: " + value.toString() + " at index: " + getUnionSchema(value, schema) + " not supported for field: " + field.name());
+      }
+    default:
+      LOG.warn("Type: " + type.name() + " with value: " + value.toString() + 
+          " not considered for field: " + field.name() + ". Please report this to dev@gora.apache.org");
     }
   }
 

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java?rev=1558504&r1=1558503&r2=1558504&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java Wed Jan 15 18:43:51 2014
@@ -28,13 +28,9 @@ import me.prettyprint.hector.api.beans.H
 import me.prettyprint.hector.api.beans.HSuperColumn;
 import me.prettyprint.hector.api.factory.HFactory;
 import me.prettyprint.hector.api.mutation.Mutator;
-import me.prettyprint.hector.api.Serializer;
 
 import org.apache.gora.persistency.Persistent;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * This class it not thread safe.
  * According to Hector's JavaDoc a Mutator isn't thread safe, too.
@@ -42,8 +38,6 @@ import org.slf4j.LoggerFactory;
  */
 public class HectorUtils<K,T extends Persistent> {
 
-  public static final Logger LOG = LoggerFactory.getLogger(HectorUtils.class);
-  
   public static<K> void insertColumn(Mutator<K> mutator, K key, String columnFamily, ByteBuffer columnName, ByteBuffer columnValue) {
     mutator.insert(key, columnFamily, createColumn(columnName, columnValue));
   }
@@ -84,14 +78,17 @@ public class HectorUtils<K,T extends Per
   }
 
 
+  @SuppressWarnings("unchecked")
   public static<K> HSuperColumn<String,ByteBuffer,ByteBuffer> createSuperColumn(String superColumnName, ByteBuffer columnName, ByteBuffer columnValue) {
     return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue)), StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get());
   }
 
+  @SuppressWarnings("unchecked")
   public static<K> HSuperColumn<String,String,ByteBuffer> createSuperColumn(String superColumnName, String columnName, ByteBuffer columnValue) {
     return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue)), StringSerializer.get(), StringSerializer.get(), ByteBufferSerializer.get());
   }
 
+  @SuppressWarnings("unchecked")
   public static<K> HSuperColumn<String,Integer,ByteBuffer> createSuperColumn(String superColumnName, Integer columnName, ByteBuffer columnValue) {
     return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue)), StringSerializer.get(), IntegerSerializer.get(), ByteBufferSerializer.get());
   }

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java?rev=1558504&r1=1558503&r2=1558504&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java Wed Jan 15 18:43:51 2014
@@ -69,7 +69,7 @@ public class GoraRecordWriter<K, T> exte
         store.flush();
       }
     }catch(Exception e){
-      LOG.warn("Exception at GoraRecordWriter.class while writing to datastore." + e.getMessage());
+      LOG.warn("Exception at GoraRecordWriter.class while writing to datastore. " + e.getMessage());
     }
   }
 }