You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2014/01/15 19:43:51 UTC
svn commit: r1558504 - in /gora/trunk: ./
gora-cassandra/src/main/java/org/apache/gora/cassandra/query/
gora-cassandra/src/main/java/org/apache/gora/cassandra/store/
gora-core/src/main/java/org/apache/gora/mapreduce/
Author: lewismc
Date: Wed Jan 15 18:43:51 2014
New Revision: 1558504
URL: http://svn.apache.org/r1558504
Log:
GORA-283 Specify field name for types not being considered in gora-cassandra
Modified:
gora/trunk/CHANGES.txt
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java
Modified: gora/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/gora/trunk/CHANGES.txt?rev=1558504&r1=1558503&r2=1558504&view=diff
==============================================================================
--- gora/trunk/CHANGES.txt (original)
+++ gora/trunk/CHANGES.txt Wed Jan 15 18:43:51 2014
@@ -4,6 +4,8 @@
Gora Change Log
+* GORA-283 Specify field name for types not being considered in gora-cassandra (lewismc)
+
* GORA-285 Change logging at o.a.g.mapreduce.GoraRecordWriter from INFO to WARN (lewismc)
* GORA-117 gora hbase does not have a mechanism to set the caching on a scanner, which makes for poor performance on map/reduce jobs (alfonsonishikawa)
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1558504&r1=1558503&r2=1558504&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java Wed Jan 15 18:43:51 2014
@@ -36,7 +36,7 @@ public abstract class CassandraColumn {
public static final int SUB = 0;
public static final int SUPER = 1;
-
+
private String family;
private int type;
private Field field;
@@ -49,7 +49,7 @@ public abstract class CassandraColumn {
public int getUnionType(){
return unionType;
}
-
+
public String getFamily() {
return family;
}
@@ -65,19 +65,20 @@ public abstract class CassandraColumn {
public void setField(Field field) {
this.field = field;
}
-
+
protected Field getField() {
return this.field;
}
-
+
public abstract ByteBuffer getName();
public abstract Object getValue();
-
+
protected Object fromByteBuffer(Schema schema, ByteBuffer byteBuffer) {
Object value = null;
Serializer<?> serializer = GoraSerializerTypeInferer.getSerializer(schema);
if (serializer == null) {
- LOG.info("Schema is not supported: " + schema.toString());
+ LOG.warn("Schema: " + schema.getName() + " is not supported. No serializer "
+ + "could be found. Please report this to dev@gora.apache.org");
} else {
value = serializer.fromByteBuffer(byteBuffer);
}
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1558504&r1=1558503&r2=1558504&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java Wed Jan 15 18:43:51 2014
@@ -21,7 +21,6 @@ package org.apache.gora.cassandra.query;
import java.nio.ByteBuffer;
import java.util.Map;
-import me.prettyprint.cassandra.serializers.IntegerSerializer;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.HSuperColumn;
@@ -29,7 +28,6 @@ import me.prettyprint.hector.api.beans.H
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericArray;
import org.apache.avro.util.Utf8;
import org.apache.gora.cassandra.serializers.Utf8Serializer;
import org.apache.gora.persistency.ListGenericArray;
@@ -119,7 +117,7 @@ public class CassandraSuperColumn extend
}
break;
default:
- LOG.info("Type not supported: " + type);
+ LOG.warn("Type: " + type.name() + " not supported for field: " + field.name());
}
return value;
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1558504&r1=1558503&r2=1558504&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Wed Jan 15 18:43:51 2014
@@ -322,25 +322,24 @@ public class CassandraClient<K, T extend
}
/**
- * Serialize value to ByteBuffer.
- * @param value the member value
+ * Serialize value to ByteBuffer using
+ * {@link org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer#getSerializer(Object)}.
+ * @param value the member value {@link java.lang.Object}.
* @return ByteBuffer object
*/
- @SuppressWarnings("unchecked")
public ByteBuffer toByteBuffer(Object value) {
ByteBuffer byteBuffer = null;
- Serializer serializer = GoraSerializerTypeInferer.getSerializer(value);
+ Serializer<Object> serializer = GoraSerializerTypeInferer.getSerializer(value);
if (serializer == null) {
- LOG.info("Serializer not found for: " + value.toString());
+ LOG.warn("Serializer not found for: " + value.toString());
}
else {
+ LOG.debug(serializer.getClass() + " selected as appropriate Serializer.");
byteBuffer = serializer.toByteBuffer(value);
}
-
if (byteBuffer == null) {
- LOG.info("value class=" + value.getClass().getName() + " value=" + value + " -> null");
+ LOG.warn("Serialization value for: " + value.getClass().getName() + " = null");
}
-
return byteBuffer;
}
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1558504&r1=1558503&r2=1558504&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Wed Jan 15 18:43:51 2014
@@ -66,7 +66,7 @@ import org.slf4j.LoggerFactory;
* such as initialization, creating and deleting schemas (Cassandra Keyspaces), etc.
*/
public class CassandraStore<K, T extends PersistentBase> extends DataStoreBase<K, T> {
-
+
/** Logging implementation */
public static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
@@ -84,12 +84,12 @@ public class CassandraStore<K, T extends
* We don't want to lock the entire collection before iterating over the keys, since in the meantime other threads are adding entries to the map.
*/
private Map<K, T> buffer = Collections.synchronizedMap(new LinkedHashMap<K, T>());
-
+
/** The default constructor for CassandraStore */
public CassandraStore() throws Exception {
// this.cassandraClient.initialize();
}
-
+
/**
* Initialize is called when then the call to
* {@link org.apache.gora.store.DataStoreFactory#createDataStore(Class<D> dataStoreClass, Class<K> keyClass, Class<T> persistent, org.apache.hadoop.conf.Configuration conf)}
@@ -144,19 +144,19 @@ public class CassandraStore<K, T extends
*/
@Override
public Result<K, T> execute(Query<K, T> query) {
-
+
Map<String, List<String>> familyMap = this.cassandraClient.getFamilyMap(query);
Map<String, String> reverseMap = this.cassandraClient.getReverseMap(query);
-
+
CassandraQuery<K, T> cassandraQuery = new CassandraQuery<K, T>();
cassandraQuery.setQuery(query);
cassandraQuery.setFamilyMap(familyMap);
-
+
CassandraResult<K, T> cassandraResult = new CassandraResult<K, T>(this, query);
cassandraResult.setReverseMap(reverseMap);
CassandraResultSet<K> cassandraResultSet = new CassandraResultSet<K>();
-
+
// We query Cassandra keyspace by families.
for (String family : familyMap.keySet()) {
if (family == null) {
@@ -164,17 +164,17 @@ public class CassandraStore<K, T extends
}
if (this.cassandraClient.isSuper(family)) {
addSuperColumns(family, cassandraQuery, cassandraResultSet);
-
+
} else {
addSubColumns(family, cassandraQuery, cassandraResultSet);
}
}
-
+
cassandraResult.setResultSet(cassandraResultSet);
-
+
return cassandraResult;
}
-
+
/**
* When we add subcolumns, Gora keys are mapped to Cassandra partition keys only.
* This is because we follow the Cassandra logic where column family data is
@@ -184,10 +184,10 @@ public class CassandraStore<K, T extends
CassandraResultSet cassandraResultSet) {
// select family columns that are included in the query
List<Row<K, ByteBuffer, ByteBuffer>> rows = this.cassandraClient.execute(cassandraQuery, family);
-
+
for (Row<K, ByteBuffer, ByteBuffer> row : rows) {
K key = row.getKey();
-
+
// find associated row in the resultset
CassandraRow<K> cassandraRow = cassandraResultSet.getRow(key);
if (cassandraRow == null) {
@@ -195,16 +195,16 @@ public class CassandraStore<K, T extends
cassandraResultSet.putRow(key, cassandraRow);
cassandraRow.setKey(key);
}
-
+
ColumnSlice<ByteBuffer, ByteBuffer> columnSlice = row.getColumnSlice();
-
+
for (HColumn<ByteBuffer, ByteBuffer> hColumn : columnSlice.getColumns()) {
CassandraSubColumn cassandraSubColumn = new CassandraSubColumn();
cassandraSubColumn.setValue(hColumn);
cassandraSubColumn.setFamily(family);
cassandraRow.add(cassandraSubColumn);
}
-
+
}
}
@@ -215,7 +215,7 @@ public class CassandraStore<K, T extends
*/
private void addSuperColumns(String family, CassandraQuery<K, T> cassandraQuery,
CassandraResultSet cassandraResultSet) {
-
+
List<SuperRow<K, String, ByteBuffer, ByteBuffer>> superRows = this.cassandraClient.executeSuper(cassandraQuery, family);
for (SuperRow<K, String, ByteBuffer, ByteBuffer> superRow: superRows) {
K key = superRow.getKey();
@@ -225,7 +225,7 @@ public class CassandraStore<K, T extends
cassandraResultSet.putRow(key, cassandraRow);
cassandraRow.setKey(key);
}
-
+
SuperSlice<String, ByteBuffer, ByteBuffer> superSlice = superRow.getSuperSlice();
for (HSuperColumn<String, ByteBuffer, ByteBuffer> hSuperColumn: superSlice.getSuperColumns()) {
CassandraSuperColumn cassandraSuperColumn = new CassandraSuperColumn();
@@ -242,12 +242,12 @@ public class CassandraStore<K, T extends
*/
@Override
public void flush() {
-
+
Set<K> keys = this.buffer.keySet();
-
+
// this duplicates memory footprint
K[] keyArray = (K[]) keys.toArray();
-
+
// iterating over the key set directly would throw ConcurrentModificationException with java.util.HashMap and subclasses
for (K key: keyArray) {
T value = this.buffer.get(key);
@@ -262,7 +262,7 @@ public class CassandraStore<K, T extends
}
}
}
-
+
// remove flushed rows
for (K key: keyArray) {
this.buffer.remove(key);
@@ -297,7 +297,7 @@ public class CassandraStore<K, T extends
partitions.add(pqi);
return partitions;
}
-
+
/**
* In Cassandra Schemas are referred to as Keyspaces
* @return Keyspace
@@ -326,51 +326,51 @@ public class CassandraStore<K, T extends
int fieldPos = field.pos();
if (value.isDirty(fieldPos)) {
Object fieldValue = value.get(fieldPos);
-
+
// check if field has a nested structure (array, map, or record)
Schema fieldSchema = field.schema();
Type type = fieldSchema.getType();
switch(type) {
- case RECORD:
- PersistentBase persistent = (PersistentBase) fieldValue;
- PersistentBase newRecord = (PersistentBase) persistent.newInstance(new StateManagerImpl());
- for (Field member: fieldSchema.getFields()) {
- newRecord.put(member.pos(), persistent.get(member.pos()));
- }
- fieldValue = newRecord;
- break;
- case MAP:
- StatefulHashMap map = (StatefulHashMap) fieldValue;
- StatefulHashMap newMap = new StatefulHashMap();
- for (Object mapKey : map.keySet()) {
- newMap.put(mapKey, map.get(mapKey));
- newMap.putState(mapKey, map.getState(mapKey));
- }
- fieldValue = newMap;
- break;
- case ARRAY:
- GenericArray array = (GenericArray) fieldValue;
- ListGenericArray newArray = new ListGenericArray(fieldSchema.getElementType());
- Iterator iter = array.iterator();
- while (iter.hasNext()) {
- newArray.add(iter.next());
- }
- fieldValue = newArray;
- break;
- case UNION:
- // storing the union selected schema, the actual value will be stored as soon as getting out of here
- // TODO determine which schema we are using: int schemaPos = getUnionSchema(fieldValue,fieldSchema);
- // and save it p.put( p.getFieldIndex(field.name() + CassandraStore.UNION_COL_SUFIX), schemaPos);
- break;
+ case RECORD:
+ PersistentBase persistent = (PersistentBase) fieldValue;
+ PersistentBase newRecord = (PersistentBase) persistent.newInstance(new StateManagerImpl());
+ for (Field member: fieldSchema.getFields()) {
+ newRecord.put(member.pos(), persistent.get(member.pos()));
+ }
+ fieldValue = newRecord;
+ break;
+ case MAP:
+ StatefulHashMap map = (StatefulHashMap) fieldValue;
+ StatefulHashMap newMap = new StatefulHashMap();
+ for (Object mapKey : map.keySet()) {
+ newMap.put(mapKey, map.get(mapKey));
+ newMap.putState(mapKey, map.getState(mapKey));
+ }
+ fieldValue = newMap;
+ break;
+ case ARRAY:
+ GenericArray array = (GenericArray) fieldValue;
+ ListGenericArray newArray = new ListGenericArray(fieldSchema.getElementType());
+ Iterator iter = array.iterator();
+ while (iter.hasNext()) {
+ newArray.add(iter.next());
+ }
+ fieldValue = newArray;
+ break;
+ case UNION:
+ // storing the union selected schema, the actual value will be stored as soon as getting out of here
+ // TODO determine which schema we are using: int schemaPos = getUnionSchema(fieldValue,fieldSchema);
+ // and save it p.put( p.getFieldIndex(field.name() + CassandraStore.UNION_COL_SUFIX), schemaPos);
+ break;
}
-
+
p.put(fieldPos, fieldValue);
}
}
-
+
// this performs a structural modification of the map
this.buffer.put(key, p);
- }
+ }
/**
* Add a field to Cassandra according to its type.
@@ -381,67 +381,65 @@ public class CassandraStore<K, T extends
private void addOrUpdateField(K key, Field field, Object value) {
Schema schema = field.schema();
Type type = schema.getType();
- switch (type) {
- case STRING:
- case BOOLEAN:
- case INT:
- case LONG:
- case BYTES:
- case FLOAT:
- case DOUBLE:
- case FIXED:
- this.cassandraClient.addColumn(key, field.name(), value);
- break;
- case RECORD:
- if (value != null) {
- if (value instanceof PersistentBase) {
- PersistentBase persistentBase = (PersistentBase) value;
- for (Field member: schema.getFields()) {
-
- // TODO: hack, do not store empty arrays
- Object memberValue = persistentBase.get(member.pos());
- if (memberValue instanceof GenericArray<?>) {
- if (((GenericArray)memberValue).size() == 0) {
- continue;
- }
- }
- this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
+ switch (type) {
+ case STRING:
+ case BOOLEAN:
+ case INT:
+ case LONG:
+ case BYTES:
+ case FLOAT:
+ case DOUBLE:
+ case FIXED:
+ this.cassandraClient.addColumn(key, field.name(), value);
+ break;
+ case RECORD:
+ if (value != null) {
+ if (value instanceof PersistentBase) {
+ PersistentBase persistentBase = (PersistentBase) value;
+ for (Field member: schema.getFields()) {
+
+ // TODO: hack, do not store empty arrays
+ Object memberValue = persistentBase.get(member.pos());
+ if (memberValue instanceof GenericArray<?>) {
+ if (((GenericArray)memberValue).size() == 0) {
+ continue;
}
- } else {
- LOG.info("Record not supported: " + value.toString());
-
+ }
+ this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
}
+ } else {
+ LOG.warn("Record with value: " + value.toString() + " not supported for field: " + field.name());
}
- break;
- case MAP:
- if (value != null) {
- if (value instanceof StatefulHashMap<?, ?>) {
- this.cassandraClient.addStatefulHashMap(key, field.name(), (StatefulHashMap<Utf8,Object>)value);
- } else {
- LOG.info("Map not supported: " + value.toString());
- }
+ }
+ break;
+ case MAP:
+ if (value != null) {
+ if (value instanceof StatefulHashMap<?, ?>) {
+ this.cassandraClient.addStatefulHashMap(key, field.name(), (StatefulHashMap<Utf8,Object>)value);
+ } else {
+ LOG.warn("Map with value: " + value.toString() + " not supported for field: " + field.name());
}
- break;
- case ARRAY:
- if (value != null) {
- if (value instanceof GenericArray<?>) {
- this.cassandraClient.addGenericArray(key, field.name(), (GenericArray)value);
- } else {
- LOG.info("Array not supported: " + value.toString());
- }
+ }
+ break;
+ case ARRAY:
+ if (value != null) {
+ if (value instanceof GenericArray<?>) {
+ this.cassandraClient.addGenericArray(key, field.name(), (GenericArray)value);
+ } else {
+ LOG.warn("Array with value: " + value.toString() + " not supported for field: " + field.name());
}
- break;
- case UNION:
- if(value != null) {
- LOG.info("Union being supported with value: " + value.toString());
- // TODO add union schema index used
- // adding union value
- this.cassandraClient.addColumn(key, field.name(), value);
- } else {
- LOG.info("Union not supported: " + value.toString());
- }
- default:
- LOG.info("Type not considered: " + type.name());
+ }
+ break;
+ case UNION:
+ if(value != null) {
+ LOG.debug("Union with value: " + value.toString() + " at index: " + getUnionSchema(value, schema) + " supported for field: " + field.name());
+ this.cassandraClient.addColumn(key, field.name(), value);
+ } else {
+ LOG.warn("Union with value: " + value.toString() + " at index: " + getUnionSchema(value, schema) + " not supported for field: " + field.name());
+ }
+ default:
+ LOG.warn("Type: " + type.name() + " with value: " + value.toString() +
+ " not considered for field: " + field.name() + ". Please report this to dev@gora.apache.org");
}
}
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java?rev=1558504&r1=1558503&r2=1558504&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java Wed Jan 15 18:43:51 2014
@@ -28,13 +28,9 @@ import me.prettyprint.hector.api.beans.H
import me.prettyprint.hector.api.beans.HSuperColumn;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
-import me.prettyprint.hector.api.Serializer;
import org.apache.gora.persistency.Persistent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* This class it not thread safe.
* According to Hector's JavaDoc a Mutator isn't thread safe, too.
@@ -42,8 +38,6 @@ import org.slf4j.LoggerFactory;
*/
public class HectorUtils<K,T extends Persistent> {
- public static final Logger LOG = LoggerFactory.getLogger(HectorUtils.class);
-
public static<K> void insertColumn(Mutator<K> mutator, K key, String columnFamily, ByteBuffer columnName, ByteBuffer columnValue) {
mutator.insert(key, columnFamily, createColumn(columnName, columnValue));
}
@@ -84,14 +78,17 @@ public class HectorUtils<K,T extends Per
}
+ @SuppressWarnings("unchecked")
public static<K> HSuperColumn<String,ByteBuffer,ByteBuffer> createSuperColumn(String superColumnName, ByteBuffer columnName, ByteBuffer columnValue) {
return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue)), StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get());
}
+ @SuppressWarnings("unchecked")
public static<K> HSuperColumn<String,String,ByteBuffer> createSuperColumn(String superColumnName, String columnName, ByteBuffer columnValue) {
return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue)), StringSerializer.get(), StringSerializer.get(), ByteBufferSerializer.get());
}
+ @SuppressWarnings("unchecked")
public static<K> HSuperColumn<String,Integer,ByteBuffer> createSuperColumn(String superColumnName, Integer columnName, ByteBuffer columnValue) {
return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue)), StringSerializer.get(), IntegerSerializer.get(), ByteBufferSerializer.get());
}
Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java?rev=1558504&r1=1558503&r2=1558504&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java Wed Jan 15 18:43:51 2014
@@ -69,7 +69,7 @@ public class GoraRecordWriter<K, T> exte
store.flush();
}
}catch(Exception e){
- LOG.warn("Exception at GoraRecordWriter.class while writing to datastore." + e.getMessage());
+ LOG.warn("Exception at GoraRecordWriter.class while writing to datastore. " + e.getMessage());
}
}
}