You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2012/06/01 15:24:52 UTC
svn commit: r1345164 - in /gora/trunk: ./
gora-cassandra/src/main/java/org/apache/gora/cassandra/query/
gora-cassandra/src/main/java/org/apache/gora/cassandra/store/
Author: lewismc
Date: Fri Jun 1 13:24:51 2012
New Revision: 1345164
URL: http://svn.apache.org/viewvc?rev=1345164&view=rev
Log:
commit to address GORA-138 & 81 and update to CHANGES.txt
Modified:
gora/trunk/CHANGES.txt
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
Modified: gora/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/gora/trunk/CHANGES.txt?rev=1345164&r1=1345163&r2=1345164&view=diff
==============================================================================
--- gora/trunk/CHANGES.txt (original)
+++ gora/trunk/CHANGES.txt Fri Jun 1 13:24:51 2012
@@ -6,13 +6,15 @@ Gora Change Log
0.3 (trunk) Current Development:
+* GORA-138 gora-cassandra array type support: Double fix for GORA-81 Replace CassandraStore#addOrUpdateField with TypeInferringSerializer to take advantage of when the value is already of type ByteBuffer. (Kazuomi Kashii via lewismc)
+
* GORA-139 Creates Cassandra column family with BytesType for column value validator (and comparators), instead of UTF8Type (Kazuomi Kashii via lewismc)
* GORA-131 gora-cassandra should support other key types than String (Kazuomi Kashii via lewismc)
* GORA-132 Uses ByteBufferSerializer for column value to support various data types rather than StringSerializer (Kazuomi Kashii via lewismc)
-* GORA-77 Replace commons logging with Log4j (Renato Javier MarroquÃn Mogrovejo via lewismc)
+* GORA-77 Replace commons logging with Slf4j (Renato Javier MarroquÃn Mogrovejo via lewismc)
* GORA-134 ListGenericArray's hashCode causes StackOverflowError (Kazuomi Kashii via lewismc)
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1345164&r1=1345163&r2=1345164&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java Fri Jun 1 13:24:51 2012
@@ -18,13 +18,29 @@
package org.apache.gora.cassandra.query;
-import org.apache.avro.Schema.Field;
+import java.nio.ByteBuffer;
+import me.prettyprint.cassandra.serializers.FloatSerializer;
+import me.prettyprint.cassandra.serializers.DoubleSerializer;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.util.Utf8;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Represents a unit of data: a key value pair tagged by a family name
*/
public abstract class CassandraColumn {
+ public static final Logger LOG = LoggerFactory.getLogger(CassandraColumn.class);
+
public static final int SUB = 0;
public static final int SUPER = 1;
@@ -52,8 +68,37 @@ public abstract class CassandraColumn {
return this.field;
}
- public abstract String getName();
+ public abstract ByteBuffer getName();
public abstract Object getValue();
+ protected Object fromByteBuffer(Type type, ByteBuffer byteBuffer) {
+ Object value = null;
+ switch (type) {
+ case STRING:
+ value = new Utf8(StringSerializer.get().fromByteBuffer(byteBuffer));
+ break;
+ case BYTES:
+ value = byteBuffer;
+ break;
+ case INT:
+ value = IntegerSerializer.get().fromByteBuffer(byteBuffer);
+ break;
+ case LONG:
+ value = LongSerializer.get().fromByteBuffer(byteBuffer);
+ break;
+ case FLOAT:
+ value = FloatSerializer.get().fromByteBuffer(byteBuffer);
+ break;
+ case DOUBLE:
+ value = DoubleSerializer.get().fromByteBuffer(byteBuffer);
+ break;
+
+ default:
+ LOG.info("Type is not supported: " + type);
+
+ }
+ return value;
+ }
+
}
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java?rev=1345164&r1=1345163&r2=1345164&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java Fri Jun 1 13:24:51 2012
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.gora.persistency.Persistent;
@@ -76,7 +78,7 @@ public class CassandraResult<K, T extend
// get field name
String family = cassandraColumn.getFamily();
- String fieldName = this.reverseMap.get(family + ":" + cassandraColumn.getName());
+ String fieldName = this.reverseMap.get(family + ":" + StringSerializer.get().fromByteBuffer(cassandraColumn.getName()));
// get field
int pos = this.persistent.getFieldIndex(fieldName);
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1345164&r1=1345163&r2=1345164&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java Fri Jun 1 13:24:51 2012
@@ -51,9 +51,9 @@ public class CassandraSubColumn extends
/**
* Key-value pair containing the raw data.
*/
- private HColumn<String, ByteBuffer> hColumn;
+ private HColumn<ByteBuffer, ByteBuffer> hColumn;
- public String getName() {
+ public ByteBuffer getName() {
return hColumn.getName();
}
@@ -65,56 +65,35 @@ public class CassandraSubColumn extends
Field field = getField();
Schema fieldSchema = field.schema();
Type type = fieldSchema.getType();
- ByteBuffer valueByteBuffer = hColumn.getValue();
+ ByteBuffer byteBuffer = hColumn.getValue();
Object value = null;
-
- switch (type) {
- case STRING:
- value = new Utf8(StringSerializer.get().fromByteBuffer(valueByteBuffer));
- break;
- case BYTES:
- value = valueByteBuffer;
- break;
- case INT:
- value = IntegerSerializer.get().fromByteBuffer(valueByteBuffer);
- break;
- case LONG:
- value = LongSerializer.get().fromByteBuffer(valueByteBuffer);
- break;
- case FLOAT:
- value = FloatSerializer.get().fromByteBuffer(valueByteBuffer);
- break;
- case DOUBLE:
- value = DoubleSerializer.get().fromByteBuffer(valueByteBuffer);
- break;
- case ARRAY:
- // convert string to array
- String valueString = StringSerializer.get().fromByteBuffer(valueByteBuffer);
- valueString = valueString.substring(1, valueString.length()-1);
- String[] elements = valueString.split(", ");
-
- Type elementType = fieldSchema.getElementType().getType();
- if (elementType == Schema.Type.STRING) {
- // the array type is String
- GenericArray<String> genericArray = new GenericData.Array<String>(elements.length, Schema.createArray(Schema.create(Schema.Type.STRING)));
- for (String element: elements) {
- genericArray.add(element);
- }
-
- value = genericArray;
- } else {
- LOG.info("Element type not supported: " + elementType);
+ if (type == Type.ARRAY) {
+ // convert string to array
+ String valueString = StringSerializer.get().fromByteBuffer(byteBuffer);
+ valueString = valueString.substring(1, valueString.length()-1);
+ String[] elements = valueString.split(", ");
+
+ Type elementType = fieldSchema.getElementType().getType();
+ if (elementType == Schema.Type.STRING) {
+ // the array type is String
+ GenericArray<String> genericArray = new GenericData.Array<String>(elements.length, Schema.createArray(Schema.create(Schema.Type.STRING)));
+ for (String element: elements) {
+ genericArray.add(element);
}
- break;
- default:
- LOG.info("Type not supported: " + type);
+
+ value = genericArray;
+ } else {
+ LOG.info("Element type not supported: " + elementType);
+ }
+ }
+ else {
+ value = fromByteBuffer(type, byteBuffer);
}
-
- return value;
+ return value;
}
- public void setValue(HColumn<String, ByteBuffer> hColumn) {
+ public void setValue(HColumn<ByteBuffer, ByteBuffer> hColumn) {
this.hColumn = hColumn;
}
}
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1345164&r1=1345163&r2=1345164&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java Fri Jun 1 13:24:51 2012
@@ -21,10 +21,7 @@ package org.apache.gora.cassandra.query;
import java.nio.ByteBuffer;
import java.util.Map;
-import me.prettyprint.cassandra.serializers.FloatSerializer;
-import me.prettyprint.cassandra.serializers.DoubleSerializer;
import me.prettyprint.cassandra.serializers.IntegerSerializer;
-import me.prettyprint.cassandra.serializers.LongSerializer;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.HSuperColumn;
@@ -32,7 +29,9 @@ import me.prettyprint.hector.api.beans.H
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericArray;
import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.ListGenericArray;
import org.apache.gora.persistency.StatefulHashMap;
import org.apache.gora.persistency.impl.PersistentBase;
import org.slf4j.Logger;
@@ -41,10 +40,10 @@ import org.slf4j.LoggerFactory;
public class CassandraSuperColumn extends CassandraColumn {
public static final Logger LOG = LoggerFactory.getLogger(CassandraSuperColumn.class);
- private HSuperColumn<String, String, ByteBuffer> hSuperColumn;
+ private HSuperColumn<String, ByteBuffer, ByteBuffer> hSuperColumn;
- public String getName() {
- return hSuperColumn.getName();
+ public ByteBuffer getName() {
+ return StringSerializer.get().toByteBuffer(hSuperColumn.getName());
}
public Object getValue() {
@@ -55,37 +54,27 @@ public class CassandraSuperColumn extend
Object value = null;
switch (type) {
+ case ARRAY:
+ Type elementType = fieldSchema.getElementType().getType();
+ GenericArray array = new ListGenericArray(Schema.create(elementType));
+
+ for (HColumn<ByteBuffer, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
+ ByteBuffer memberByteBuffer = hColumn.getValue();
+ Object memberValue = fromByteBuffer(elementType, hColumn.getValue());
+ // int i = IntegerSerializer().get().fromByteBuffer(hColumn.getName());
+ array.add(memberValue);
+ }
+ value = array;
+
+ break;
case MAP:
Map<Utf8, Object> map = new StatefulHashMap<Utf8, Object>();
Type valueType = fieldSchema.getValueType().getType();
- for (HColumn<String, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
+ for (HColumn<ByteBuffer, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
ByteBuffer memberByteBuffer = hColumn.getValue();
- Object memberValue = null;
- switch (valueType) {
- case STRING:
- memberValue = new Utf8(StringSerializer.get().fromByteBuffer(memberByteBuffer));
- break;
- case BYTES:
- memberValue = memberByteBuffer;
- break;
- case INT:
- memberValue = IntegerSerializer.get().fromByteBuffer(memberByteBuffer);
- break;
- case LONG:
- memberValue = LongSerializer.get().fromByteBuffer(memberByteBuffer);
- break;
- case FLOAT:
- memberValue = FloatSerializer.get().fromByteBuffer(memberByteBuffer);
- break;
- case DOUBLE:
- memberValue = DoubleSerializer.get().fromByteBuffer(memberByteBuffer);
- break;
- default:
- LOG.info("Type for the map value is not supported: " + valueType);
-
- }
- map.put(new Utf8(hColumn.getName()), memberValue);
+ Object memberValue = fromByteBuffer(valueType, hColumn.getValue());
+ map.put(new Utf8(StringSerializer.get().fromByteBuffer(hColumn.getName())), memberValue);
}
value = map;
@@ -115,12 +104,13 @@ public class CassandraSuperColumn extend
if (value instanceof PersistentBase) {
PersistentBase record = (PersistentBase) value;
- for (HColumn<String, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
- Field memberField = fieldSchema.getField(hColumn.getName());
+ for (HColumn<ByteBuffer, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
+ String memberName = StringSerializer.get().fromByteBuffer(hColumn.getName());
+ Field memberField = fieldSchema.getField(memberName);
CassandraSubColumn cassandraColumn = new CassandraSubColumn();
cassandraColumn.setField(memberField);
cassandraColumn.setValue(hColumn);
- record.put(record.getFieldIndex(hColumn.getName()), cassandraColumn.getValue());
+ record.put(record.getFieldIndex(memberName), cassandraColumn.getValue());
}
}
break;
@@ -130,8 +120,8 @@ public class CassandraSuperColumn extend
return value;
}
-
- public void setValue(HSuperColumn<String, String, ByteBuffer> hSuperColumn) {
+
+ public void setValue(HSuperColumn<String, ByteBuffer, ByteBuffer> hSuperColumn) {
this.hSuperColumn = hSuperColumn;
}
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1345164&r1=1345163&r2=1345164&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Fri Jun 1 13:24:51 2012
@@ -27,6 +27,7 @@ import java.util.Map;
import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
import me.prettyprint.cassandra.serializers.FloatSerializer;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
import me.prettyprint.cassandra.serializers.DoubleSerializer;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.serializers.SerializerTypeInferer;
@@ -67,8 +68,6 @@ public class CassandraClient<K, T extend
private CassandraMapping cassandraMapping = new CassandraMapping();
- private StringSerializer stringSerializer = new StringSerializer();
- private ByteBufferSerializer byteBufferSerializer = new ByteBufferSerializer();
private Serializer<K> keySerializer;
public void initialize(Class<K> keyClass) throws Exception {
@@ -118,7 +117,6 @@ public class CassandraClient<K, T extend
keyspaceDefinition = null;
}
-
}
@@ -141,52 +139,53 @@ public class CassandraClient<K, T extend
return;
}
- ByteBuffer byteBuffer = null;
- if (value instanceof ByteBuffer) {
- byteBuffer = (ByteBuffer) value;
- }
- else if (value instanceof Utf8) {
- byteBuffer = stringSerializer.toByteBuffer(((Utf8)value).toString());
- }
- else if (value instanceof Float) {
- // workaround for hector-core-1.0-1.jar
- // because SerializerTypeInferer.getSerializer(Float ) returns ObjectSerializer !?
- byteBuffer = FloatSerializer.get().toByteBuffer((Float)value);
- }
- else if (value instanceof Double) {
- // workaround for hector-core-1.0-1.jar
- // because SerializerTypeInferer.getSerializer(Double ) returns ObjectSerializer !?
- byteBuffer = DoubleSerializer.get().toByteBuffer((Double)value);
- }
- else {
- byteBuffer = SerializerTypeInferer.getSerializer(value).toByteBuffer(value);
- }
+ ByteBuffer byteBuffer = toByteBuffer(value);
String columnFamily = this.cassandraMapping.getFamily(fieldName);
String columnName = this.cassandraMapping.getColumn(fieldName);
- this.mutator.insert(key, columnFamily, HFactory.createColumn(columnName, byteBuffer, stringSerializer, byteBufferSerializer));
+ this.mutator.insert(key, columnFamily, HFactory.createColumn(columnName, byteBuffer, StringSerializer.get(), ByteBufferSerializer.get()));
}
/**
* Insert a member in a super column. This is used for map and record Avro types.
* @param key the row key
* @param fieldName the field name
- * @param memberName the member name
+ * @param columnName the column name (the member name, or the index of array)
* @param value the member value
*/
@SuppressWarnings("unchecked")
-public void addSubColumn(K key, String fieldName, String memberName, Object value) {
+ public void addSubColumn(K key, String fieldName, ByteBuffer columnName, Object value) {
if (value == null) {
return;
}
+ ByteBuffer byteBuffer = toByteBuffer(value);
+
+ String columnFamily = this.cassandraMapping.getFamily(fieldName);
+ String superColumnName = this.cassandraMapping.getColumn(fieldName);
+
+ this.mutator.insert(key, columnFamily, HFactory.createSuperColumn(superColumnName, Arrays.asList(HFactory.createColumn(columnName, byteBuffer, ByteBufferSerializer.get(), ByteBufferSerializer.get())), StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get()));
+
+ }
+
+ /**
+ * Serialize value to ByteBuffer.
+ * @param value the member value
+ * @return ByteBuffer object
+ */
+ @SuppressWarnings("unchecked")
+ public ByteBuffer toByteBuffer(Object value) {
+ if (value == null) {
+ return null;
+ }
+
ByteBuffer byteBuffer = null;
if (value instanceof ByteBuffer) {
byteBuffer = (ByteBuffer) value;
}
else if (value instanceof Utf8) {
- byteBuffer = stringSerializer.toByteBuffer(((Utf8)value).toString());
+ byteBuffer = StringSerializer.get().toByteBuffer(((Utf8)value).toString());
}
else if (value instanceof Float) {
// workaround for hector-core-1.0-1.jar
@@ -202,22 +201,22 @@ public void addSubColumn(K key, String f
byteBuffer = SerializerTypeInferer.getSerializer(value).toByteBuffer(value);
}
- String columnFamily = this.cassandraMapping.getFamily(fieldName);
- String superColumnName = this.cassandraMapping.getColumn(fieldName);
-
- this.mutator.insert(key, columnFamily, HFactory.createSuperColumn(superColumnName, Arrays.asList(HFactory.createColumn(memberName, byteBuffer, stringSerializer, byteBufferSerializer)), this.stringSerializer, this.stringSerializer, this.byteBufferSerializer));
-
+ return byteBuffer;
}
-
+
/**
* Select a family column in the keyspace.
* @param cassandraQuery a wrapper of the query
* @param family the family name to be queried
* @return a list of family rows
*/
- public List<Row<K, String, ByteBuffer>> execute(CassandraQuery<K, T> cassandraQuery, String family) {
+ public List<Row<K, ByteBuffer, ByteBuffer>> execute(CassandraQuery<K, T> cassandraQuery, String family) {
String[] columnNames = cassandraQuery.getColumns(family);
+ ByteBuffer[] columnNameByteBuffers = new ByteBuffer[columnNames.length];
+ for (int i = 0; i < columnNames.length; i++) {
+ columnNameByteBuffers[i] = StringSerializer.get().toByteBuffer(columnNames[i]);
+ }
Query<K, T> query = cassandraQuery.getQuery();
int limit = (int) query.getLimit();
if (limit < 1) {
@@ -226,16 +225,15 @@ public void addSubColumn(K key, String f
K startKey = query.getStartKey();
K endKey = query.getEndKey();
- RangeSlicesQuery<K, String, ByteBuffer> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, this.keySerializer, stringSerializer, byteBufferSerializer);
+ RangeSlicesQuery<K, ByteBuffer, ByteBuffer> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, this.keySerializer, ByteBufferSerializer.get(), ByteBufferSerializer.get());
rangeSlicesQuery.setColumnFamily(family);
rangeSlicesQuery.setKeys(startKey, endKey);
- rangeSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
+ rangeSlicesQuery.setRange(ByteBuffer.wrap(new byte[0]), ByteBuffer.wrap(new byte[0]), false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
rangeSlicesQuery.setRowCount(limit);
- rangeSlicesQuery.setColumnNames(columnNames);
+ rangeSlicesQuery.setColumnNames(columnNameByteBuffers);
-
- QueryResult<OrderedRows<K, String, ByteBuffer>> queryResult = rangeSlicesQuery.execute();
- OrderedRows<K, String, ByteBuffer> orderedRows = queryResult.get();
+ QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> queryResult = rangeSlicesQuery.execute();
+ OrderedRows<K, ByteBuffer, ByteBuffer> orderedRows = queryResult.get();
return orderedRows.getList();
@@ -290,7 +288,7 @@ public void addSubColumn(K key, String f
return this.cassandraMapping.isSuper(family);
}
- public List<SuperRow<K, String, String, ByteBuffer>> executeSuper(CassandraQuery<K, T> cassandraQuery, String family) {
+ public List<SuperRow<K, String, ByteBuffer, ByteBuffer>> executeSuper(CassandraQuery<K, T> cassandraQuery, String family) {
String[] columnNames = cassandraQuery.getColumns(family);
Query<K, T> query = cassandraQuery.getQuery();
int limit = (int) query.getLimit();
@@ -300,7 +298,7 @@ public void addSubColumn(K key, String f
K startKey = query.getStartKey();
K endKey = query.getEndKey();
- RangeSuperSlicesQuery<K, String, String, ByteBuffer> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace, this.keySerializer, this.stringSerializer, this.stringSerializer, this.byteBufferSerializer);
+ RangeSuperSlicesQuery<K, String, ByteBuffer, ByteBuffer> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace, this.keySerializer, StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get());
rangeSuperSlicesQuery.setColumnFamily(family);
rangeSuperSlicesQuery.setKeys(startKey, endKey);
rangeSuperSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
@@ -308,8 +306,8 @@ public void addSubColumn(K key, String f
rangeSuperSlicesQuery.setColumnNames(columnNames);
- QueryResult<OrderedSuperRows<K, String, String, ByteBuffer>> queryResult = rangeSuperSlicesQuery.execute();
- OrderedSuperRows<K, String, String, ByteBuffer> orderedRows = queryResult.get();
+ QueryResult<OrderedSuperRows<K, String, ByteBuffer, ByteBuffer>> queryResult = rangeSuperSlicesQuery.execute();
+ OrderedSuperRows<K, String, ByteBuffer, ByteBuffer> orderedRows = queryResult.get();
return orderedRows.getList();
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1345164&r1=1345163&r2=1345164&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Fri Jun 1 13:24:51 2012
@@ -21,12 +21,15 @@ package org.apache.gora.cassandra.store;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.HSuperColumn;
@@ -45,6 +48,7 @@ import org.apache.gora.cassandra.query.C
import org.apache.gora.cassandra.query.CassandraRow;
import org.apache.gora.cassandra.query.CassandraSubColumn;
import org.apache.gora.cassandra.query.CassandraSuperColumn;
+import org.apache.gora.persistency.ListGenericArray;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.StatefulHashMap;
import org.apache.gora.persistency.impl.PersistentBase;
@@ -131,29 +135,28 @@ public class CassandraStore<K, T extends
// We query Cassandra keyspace by families.
for (String family : familyMap.keySet()) {
+ if (family == null) {
+ continue;
+ }
if (this.cassandraClient.isSuper(family)) {
addSuperColumns(family, cassandraQuery, cassandraResultSet);
} else {
addSubColumns(family, cassandraQuery, cassandraResultSet);
-
}
-
}
cassandraResult.setResultSet(cassandraResultSet);
-
return cassandraResult;
-
}
private void addSubColumns(String family, CassandraQuery<K, T> cassandraQuery,
CassandraResultSet cassandraResultSet) {
// select family columns that are included in the query
- List<Row<K, String, ByteBuffer>> rows = this.cassandraClient.execute(cassandraQuery, family);
+ List<Row<K, ByteBuffer, ByteBuffer>> rows = this.cassandraClient.execute(cassandraQuery, family);
- for (Row<K, String, ByteBuffer> row : rows) {
+ for (Row<K, ByteBuffer, ByteBuffer> row : rows) {
K key = row.getKey();
// find associated row in the resultset
@@ -164,9 +167,9 @@ public class CassandraStore<K, T extends
cassandraRow.setKey(key);
}
- ColumnSlice<String, ByteBuffer> columnSlice = row.getColumnSlice();
+ ColumnSlice<ByteBuffer, ByteBuffer> columnSlice = row.getColumnSlice();
- for (HColumn<String, ByteBuffer> hColumn : columnSlice.getColumns()) {
+ for (HColumn<ByteBuffer, ByteBuffer> hColumn : columnSlice.getColumns()) {
CassandraSubColumn cassandraSubColumn = new CassandraSubColumn();
cassandraSubColumn.setValue(hColumn);
cassandraSubColumn.setFamily(family);
@@ -179,8 +182,8 @@ public class CassandraStore<K, T extends
private void addSuperColumns(String family, CassandraQuery<K, T> cassandraQuery,
CassandraResultSet cassandraResultSet) {
- List<SuperRow<K, String, String, ByteBuffer>> superRows = this.cassandraClient.executeSuper(cassandraQuery, family);
- for (SuperRow<K, String, String, ByteBuffer> superRow: superRows) {
+ List<SuperRow<K, String, ByteBuffer, ByteBuffer>> superRows = this.cassandraClient.executeSuper(cassandraQuery, family);
+ for (SuperRow<K, String, ByteBuffer, ByteBuffer> superRow: superRows) {
K key = superRow.getKey();
CassandraRow<K> cassandraRow = cassandraResultSet.getRow(key);
if (cassandraRow == null) {
@@ -189,8 +192,8 @@ public class CassandraStore<K, T extends
cassandraRow.setKey(key);
}
- SuperSlice<String, String, ByteBuffer> superSlice = superRow.getSuperSlice();
- for (HSuperColumn<String, String, ByteBuffer> hSuperColumn: superSlice.getSuperColumns()) {
+ SuperSlice<String, ByteBuffer, ByteBuffer> superSlice = superRow.getSuperSlice();
+ for (HSuperColumn<String, ByteBuffer, ByteBuffer> hSuperColumn: superSlice.getSuperColumns()) {
CassandraSuperColumn cassandraSuperColumn = new CassandraSuperColumn();
cassandraSuperColumn.setValue(hSuperColumn);
cassandraSuperColumn.setFamily(family);
@@ -277,7 +280,7 @@ public class CassandraStore<K, T extends
if (value.isDirty(field.pos())) {
Object fieldValue = value.get(field.pos());
- // check if field has a nested structure (map or record)
+ // check if field has a nested structure (array, map, or record)
Schema fieldSchema = field.schema();
Type type = fieldSchema.getType();
switch(type) {
@@ -294,6 +297,16 @@ public class CassandraStore<K, T extends
StatefulHashMap<?, ?> newMap = new StatefulHashMap(map);
fieldValue = newMap;
break;
+ case ARRAY:
+ GenericArray array = (GenericArray) fieldValue;
+ Type elementType = fieldSchema.getElementType().getType();
+ GenericArray newArray = new ListGenericArray(Schema.create(elementType));
+ Iterator iter = array.iterator();
+ while (iter.hasNext()) {
+ newArray.add(iter.next());
+ }
+ fieldValue = newArray;
+ break;
}
p.put(field.pos(), fieldValue);
@@ -340,7 +353,7 @@ public class CassandraStore<K, T extends
if (memberValue instanceof Utf8) {
memberValue = memberValue.toString();
}
- this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
+ this.cassandraClient.addSubColumn(key, field.name(), StringSerializer.get().toByteBuffer(member.name()), memberValue);
}
} else {
LOG.info("Record not supported: " + value.toString());
@@ -367,13 +380,29 @@ public class CassandraStore<K, T extends
if (keyValue instanceof Utf8) {
keyValue = keyValue.toString();
}
- this.cassandraClient.addSubColumn(key, field.name(), mapKey.toString(), keyValue);
+ this.cassandraClient.addSubColumn(key, field.name(), StringSerializer.get().toByteBuffer(mapKey.toString()), keyValue);
}
} else {
LOG.info("Map not supported: " + value.toString());
}
}
break;
+ case ARRAY:
+ if (value != null) {
+ if (value instanceof GenericArray<?>) {
+ GenericArray<Object> array = (GenericArray<Object>) value;
+ int i= 0;
+ for (Object itemValue: array) {
+ if (itemValue instanceof Utf8) {
+ itemValue = itemValue.toString();
+ }
+ this.cassandraClient.addSubColumn(key, field.name(), IntegerSerializer.get().toByteBuffer(i++), itemValue);
+ }
+ } else {
+ LOG.info("Array not supported: " + value.toString());
+ }
+ }
+ break;
default:
LOG.info("Type not considered: " + type.name());
}