You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by ka...@apache.org on 2012/07/13 19:11:38 UTC
svn commit: r1361302 - in /gora/trunk: ./
gora-cassandra/src/main/java/org/apache/gora/cassandra/query/
gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/
gora-cassandra/src/main/java/org/apache/gora/cassandra/store/
Author: kazk
Date: Fri Jul 13 17:11:38 2012
New Revision: 1361302
URL: http://svn.apache.org/viewvc?rev=1361302&view=rev
Log:
Adds serializer packages based on Hector's ones with GORA-142 spec
Added:
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
Modified:
gora/trunk/CHANGES.txt
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
Modified: gora/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/gora/trunk/CHANGES.txt?rev=1361302&r1=1361301&r2=1361302&view=diff
==============================================================================
--- gora/trunk/CHANGES.txt (original)
+++ gora/trunk/CHANGES.txt Fri Jul 13 17:11:38 2012
@@ -6,6 +6,8 @@ Gora Change Log
0.3 (trunk) Current Development:
+* GORA-142 Creates org.apache.gora.cassandra.serializers package in order to clean the code of store and query packages and to support additional types in future. (kazk)
+
* GORA-148 CassandraMapping supports only (first) keyspace and class in gora-cassandra-mapping.xml (kazk)
* GORA-143 GoraCompiler needs to add "import FixedSize" statement for FIXED type (kazk)
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1361302&r1=1361301&r2=1361302&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java Fri Jul 13 17:11:38 2012
@@ -20,18 +20,12 @@ package org.apache.gora.cassandra.query;
import java.nio.ByteBuffer;
-import me.prettyprint.cassandra.serializers.FloatSerializer;
-import me.prettyprint.cassandra.serializers.DoubleSerializer;
-import me.prettyprint.cassandra.serializers.IntegerSerializer;
-import me.prettyprint.cassandra.serializers.LongSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.Serializer;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericArray;
-import org.apache.avro.util.Utf8;
+import org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,32 +65,13 @@ public abstract class CassandraColumn {
public abstract ByteBuffer getName();
public abstract Object getValue();
-
- protected Object fromByteBuffer(Type type, ByteBuffer byteBuffer) {
+ protected Object fromByteBuffer(Schema schema, ByteBuffer byteBuffer) {
Object value = null;
- switch (type) {
- case STRING:
- value = new Utf8(StringSerializer.get().fromByteBuffer(byteBuffer));
- break;
- case BYTES:
- value = byteBuffer;
- break;
- case INT:
- value = IntegerSerializer.get().fromByteBuffer(byteBuffer);
- break;
- case LONG:
- value = LongSerializer.get().fromByteBuffer(byteBuffer);
- break;
- case FLOAT:
- value = FloatSerializer.get().fromByteBuffer(byteBuffer);
- break;
- case DOUBLE:
- value = DoubleSerializer.get().fromByteBuffer(byteBuffer);
- break;
-
- default:
- LOG.info("Type is not supported: " + type);
-
+ Serializer serializer = GoraSerializerTypeInferer.getSerializer(schema);
+ if (serializer == null) {
+ LOG.info("Schema is not supported: " + schema.toString());
+ } else {
+ value = serializer.fromByteBuffer(byteBuffer);
}
return value;
}
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java?rev=1361302&r1=1361301&r2=1361302&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java Fri Jul 13 17:11:38 2012
@@ -26,6 +26,7 @@ import me.prettyprint.cassandra.serializ
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
+import org.apache.avro.specific.SpecificFixed;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.query.Query;
import org.apache.gora.query.impl.ResultBase;
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1361302&r1=1361301&r2=1361302&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java Fri Jul 13 17:11:38 2012
@@ -37,6 +37,8 @@ import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
+import org.apache.gora.cassandra.serializers.GenericArraySerializer;
+import org.apache.gora.cassandra.serializers.TypeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,28 +68,16 @@ public class CassandraSubColumn extends
Schema fieldSchema = field.schema();
Type type = fieldSchema.getType();
ByteBuffer byteBuffer = hColumn.getValue();
+ if (byteBuffer == null) {
+ return null;
+ }
Object value = null;
if (type == Type.ARRAY) {
- // convert string to array
- String valueString = StringSerializer.get().fromByteBuffer(byteBuffer);
- valueString = valueString.substring(1, valueString.length()-1);
- String[] elements = valueString.split(", ");
-
- Type elementType = fieldSchema.getElementType().getType();
- if (elementType == Schema.Type.STRING) {
- // the array type is String
- GenericArray<String> genericArray = new GenericData.Array<String>(elements.length, Schema.createArray(Schema.create(Schema.Type.STRING)));
- for (String element: elements) {
- genericArray.add(element);
- }
-
- value = genericArray;
- } else {
- LOG.info("Element type not supported: " + elementType);
- }
- }
- else {
- value = fromByteBuffer(type, byteBuffer);
+ GenericArraySerializer serializer = GenericArraySerializer.get(fieldSchema.getElementType());
+ GenericArray genericArray = serializer.fromByteBuffer(byteBuffer);
+ value = genericArray;
+ } else {
+ value = fromByteBuffer(fieldSchema, byteBuffer);
}
return value;
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1361302&r1=1361301&r2=1361302&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java Fri Jul 13 17:11:38 2012
@@ -31,6 +31,7 @@ import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.util.Utf8;
+import org.apache.gora.cassandra.serializers.Utf8Serializer;
import org.apache.gora.persistency.ListGenericArray;
import org.apache.gora.persistency.StatefulHashMap;
import org.apache.gora.persistency.impl.PersistentBase;
@@ -55,12 +56,11 @@ public class CassandraSuperColumn extend
switch (type) {
case ARRAY:
- Type elementType = fieldSchema.getElementType().getType();
- GenericArray array = new ListGenericArray(Schema.create(elementType));
+ ListGenericArray array = new ListGenericArray(fieldSchema.getElementType());
for (HColumn<ByteBuffer, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
ByteBuffer memberByteBuffer = hColumn.getValue();
- Object memberValue = fromByteBuffer(elementType, hColumn.getValue());
+ Object memberValue = fromByteBuffer(fieldSchema.getElementType(), hColumn.getValue());
// int i = IntegerSerializer().get().fromByteBuffer(hColumn.getName());
array.add(memberValue);
}
@@ -69,12 +69,12 @@ public class CassandraSuperColumn extend
break;
case MAP:
Map<Utf8, Object> map = new StatefulHashMap<Utf8, Object>();
- Type valueType = fieldSchema.getValueType().getType();
for (HColumn<ByteBuffer, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
ByteBuffer memberByteBuffer = hColumn.getValue();
- Object memberValue = fromByteBuffer(valueType, hColumn.getValue());
- map.put(new Utf8(StringSerializer.get().fromByteBuffer(hColumn.getName())), memberValue);
+ Object memberValue = null;
+ memberValue = fromByteBuffer(fieldSchema.getValueType(), hColumn.getValue());
+ map.put(Utf8Serializer.get().fromByteBuffer(hColumn.getName()), memberValue);
}
value = map;
@@ -106,6 +106,10 @@ public class CassandraSuperColumn extend
for (HColumn<ByteBuffer, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
String memberName = StringSerializer.get().fromByteBuffer(hColumn.getName());
+ if (memberName == null || memberName.length() == 0) {
+ LOG.warn("member name is null or empty.");
+ continue;
+ }
Field memberField = fieldSchema.getField(memberName);
CassandraSubColumn cassandraColumn = new CassandraSubColumn();
cassandraColumn.setField(memberField);
Added: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java?rev=1361302&view=auto
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java (added)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java Fri Jul 13 17:11:38 2012
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import me.prettyprint.cassandra.serializers.AbstractSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.hector.api.Serializer;
+import me.prettyprint.hector.api.ddl.ComparatorType;
+import static me.prettyprint.hector.api.ddl.ComparatorType.UTF8TYPE;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificFixed;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.ListGenericArray;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A GenericArraySerializer translates the byte[] to and from GenericArray of Avro.
+ */
+public class GenericArraySerializer<T> extends AbstractSerializer<GenericArray<T>> {
+
+ public static final Logger LOG = LoggerFactory.getLogger(GenericArraySerializer.class);
+
+ private static Map<Type, GenericArraySerializer> elementTypeToSerializerMap = new HashMap<Type, GenericArraySerializer>();
+ private static Map<Class, GenericArraySerializer> fixedClassToSerializerMap = new HashMap<Class, GenericArraySerializer>();
+
+ public static GenericArraySerializer get(Type elementType) {
+ GenericArraySerializer serializer = elementTypeToSerializerMap.get(elementType);
+ if (serializer == null) {
+ serializer = new GenericArraySerializer(elementType);
+ elementTypeToSerializerMap.put(elementType, serializer);
+ }
+ return serializer;
+ }
+
+ public static GenericArraySerializer get(Type elementType, Class clazz) {
+ if (elementType != Type.FIXED) {
+ return null;
+ }
+ GenericArraySerializer serializer = elementTypeToSerializerMap.get(clazz);
+ if (serializer == null) {
+ serializer = new GenericArraySerializer(clazz);
+ fixedClassToSerializerMap.put(clazz, serializer);
+ }
+ return serializer;
+ }
+
+ public static GenericArraySerializer get(Schema elementSchema) {
+ Type type = elementSchema.getType();
+ if (type == Type.FIXED) {
+ return get(Type.FIXED, TypeUtils.getClass(elementSchema));
+ } else {
+ return get(type);
+ }
+ }
+
+ private Schema elementSchema = null;
+ private Type elementType = null;
+ private int size = -1;
+ private Class<T> clazz = null;
+ private Serializer<T> elementSerializer = null;
+
+ public GenericArraySerializer(Serializer<T> elementSerializer) {
+ this.elementSerializer = elementSerializer;
+ }
+
+ public GenericArraySerializer(Schema elementSchema) {
+ this.elementSchema = elementSchema;
+ elementType = elementSchema.getType();
+ size = TypeUtils.getFixedSize(elementSchema);
+ elementSerializer = GoraSerializerTypeInferer.getSerializer(elementSchema);
+ }
+
+ public GenericArraySerializer(Type elementType) {
+ this.elementType = elementType;
+ if (elementType != Type.FIXED) {
+ elementSchema = Schema.create(elementType);
+ }
+ clazz = TypeUtils.getClass(elementType);
+ size = TypeUtils.getFixedSize(elementType);
+ elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType);
+ }
+
+ public GenericArraySerializer(Class<T> clazz) {
+ this.clazz = clazz;
+ elementType = TypeUtils.getType(clazz);
+ size = TypeUtils.getFixedSize(clazz);
+ if (elementType == null || elementType == Type.FIXED) {
+ elementType = Type.FIXED;
+ elementSchema = TypeUtils.getSchema(clazz);
+ elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType, clazz);
+ } else {
+ elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType);
+ }
+ }
+
+ @Override
+ public ByteBuffer toByteBuffer(GenericArray<T> array) {
+ if (array == null) {
+ return null;
+ }
+ if (size > 0) {
+ return toByteBufferWithFixedLengthElements(array);
+ } else {
+ return toByteBufferWithVariableLengthElements(array);
+ }
+ }
+
+ private ByteBuffer toByteBufferWithFixedLengthElements(GenericArray<T> array) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate((int) array.size() * size);
+ for (T element : array) {
+ byteBuffer.put(elementSerializer.toByteBuffer(element));
+ }
+ byteBuffer.rewind();
+ return byteBuffer;
+ }
+
+ private ByteBuffer toByteBufferWithVariableLengthElements(GenericArray<T> array) {
+ int n = (int) array.size();
+ List<byte[]> list = new ArrayList<byte[]>(n);
+ n *= 4;
+ for (T element : array) {
+ byte[] bytes = BytesArraySerializer.get().fromByteBuffer(elementSerializer.toByteBuffer(element));
+ list.add(bytes);
+ n += bytes.length;
+ }
+ ByteBuffer byteBuffer = ByteBuffer.allocate(n);
+ for (byte[] bytes : list) {
+ byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length));
+ byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes));
+ }
+ byteBuffer.rewind();
+ return byteBuffer;
+ }
+
+ @Override
+ public GenericArray<T> fromByteBuffer(ByteBuffer byteBuffer) {
+ if (byteBuffer == null) {
+ return null;
+ }
+ GenericArray<T> array = new ListGenericArray<T>(elementSchema);
+int i = 0;
+ while (true) {
+ T element = null;
+ try {
+ if (size > 0) {
+ element = elementSerializer.fromByteBuffer(byteBuffer);
+ }
+ else {
+ int n = IntegerSerializer.get().fromByteBuffer(byteBuffer);
+ byte[] bytes = new byte[n];
+ byteBuffer.get(bytes, 0, n);
+ element = elementSerializer.fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) );
+ }
+ } catch (BufferUnderflowException e) {
+ break;
+ }
+ if (element == null) {
+ break;
+ }
+ array.add(element);
+ }
+ return array;
+ }
+
+ @Override
+ public ComparatorType getComparatorType() {
+ return elementSerializer.getComparatorType();
+ }
+
+}
Added: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java?rev=1361302&view=auto
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java (added)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java Fri Jul 13 17:11:38 2012
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import java.nio.ByteBuffer;
+
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.BooleanSerializer;
+import me.prettyprint.cassandra.serializers.DoubleSerializer;
+import me.prettyprint.cassandra.serializers.FloatSerializer;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.SerializerTypeInferer;
+import me.prettyprint.hector.api.Serializer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificFixed;
+import org.apache.avro.util.Utf8;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class that infers the concrete Serializer needed to turn a value into
+ * its binary representation
+ */
+public class GoraSerializerTypeInferer {
+
+ public static final Logger LOG = LoggerFactory.getLogger(GoraSerializerTypeInferer.class);
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <T> Serializer<T> getSerializer(Object value) {
+ Serializer serializer = null;
+ if (value == null) {
+ serializer = ByteBufferSerializer.get();
+ } else if (value instanceof Utf8) {
+ serializer = Utf8Serializer.get();
+ } else if (value instanceof Boolean) {
+ serializer = BooleanSerializer.get();
+ } else if (value instanceof ByteBuffer) {
+ serializer = ByteBufferSerializer.get();
+ } else if (value instanceof byte[]) {
+ serializer = BytesArraySerializer.get();
+ } else if (value instanceof Double) {
+ serializer = DoubleSerializer.get();
+ } else if (value instanceof Float) {
+ serializer = FloatSerializer.get();
+ } else if (value instanceof Integer) {
+ serializer = IntegerSerializer.get();
+ } else if (value instanceof Long) {
+ serializer = LongSerializer.get();
+ } else if (value instanceof String) {
+ serializer = StringSerializer.get();
+ } else if (value instanceof SpecificFixed) {
+ serializer = SpecificFixedSerializer.get(value.getClass());
+ } else if (value instanceof GenericArray) {
+ Schema schema = ((GenericArray)value).getSchema();
+ if (schema.getType() == Type.ARRAY) {
+ schema = schema.getElementType();
+ }
+ serializer = GenericArraySerializer.get(schema);
+ } else {
+ serializer = SerializerTypeInferer.getSerializer(value);
+ }
+ return serializer;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <T> Serializer<T> getSerializer(Class<?> valueClass) {
+ Serializer serializer = null;
+ if (valueClass.equals(Utf8.class)) {
+ serializer = Utf8Serializer.get();
+ } else if (valueClass.equals(Boolean.class) || valueClass.equals(boolean.class)) {
+ serializer = BooleanSerializer.get();
+ } else if (valueClass.equals(ByteBuffer.class)) {
+ serializer = ByteBufferSerializer.get();
+ } else if (valueClass.equals(Double.class) || valueClass.equals(double.class)) {
+ serializer = DoubleSerializer.get();
+ } else if (valueClass.equals(Float.class) || valueClass.equals(float.class)) {
+ serializer = FloatSerializer.get();
+ } else if (valueClass.equals(Integer.class) || valueClass.equals(int.class)) {
+ serializer = IntegerSerializer.get();
+ } else if (valueClass.equals(Long.class) || valueClass.equals(long.class)) {
+ serializer = LongSerializer.get();
+ } else if (valueClass.equals(String.class)) {
+ serializer = StringSerializer.get();
+ } else {
+ serializer = SerializerTypeInferer.getSerializer(valueClass);
+ }
+ return serializer;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <T> Serializer<T> getSerializer(Schema schema) {
+ Serializer serializer = null;
+ Type type = schema.getType();
+ if (type == Type.STRING) {
+ serializer = Utf8Serializer.get();
+ } else if (type == Type.BOOLEAN) {
+ serializer = BooleanSerializer.get();
+ } else if (type == Type.BYTES) {
+ serializer = ByteBufferSerializer.get();
+ } else if (type == Type.DOUBLE) {
+ serializer = DoubleSerializer.get();
+ } else if (type == Type.FLOAT) {
+ serializer = FloatSerializer.get();
+ } else if (type == Type.INT) {
+ serializer = IntegerSerializer.get();
+ } else if (type == Type.LONG) {
+ serializer = LongSerializer.get();
+ } else if (type == Type.FIXED) {
+ Class clazz = TypeUtils.getClass(schema);
+ serializer = SpecificFixedSerializer.get(clazz);
+ // serializer = SpecificFixedSerializer.get(schema);
+ } else if (type == Type.ARRAY) {
+ serializer = GenericArraySerializer.get(schema.getElementType());
+ } else {
+ serializer = null;
+ }
+ return serializer;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <T> Serializer<T> getSerializer(Type type) {
+ Serializer serializer = null;
+ if (type == Type.STRING) {
+ serializer = Utf8Serializer.get();
+ } else if (type == Type.BOOLEAN) {
+ serializer = BooleanSerializer.get();
+ } else if (type == Type.BYTES) {
+ serializer = ByteBufferSerializer.get();
+ } else if (type == Type.DOUBLE) {
+ serializer = DoubleSerializer.get();
+ } else if (type == Type.FLOAT) {
+ serializer = FloatSerializer.get();
+ } else if (type == Type.INT) {
+ serializer = IntegerSerializer.get();
+ } else if (type == Type.LONG) {
+ serializer = LongSerializer.get();
+ } else if (type == Type.FIXED) {
+ serializer = SpecificFixedSerializer.get();
+ } else {
+ serializer = null;
+ }
+ return serializer;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <T> Serializer<T> getSerializer(Type type, Type elementType) {
+ Serializer serializer = null;
+ if (type == null) {
+ if (elementType == null) {
+ serializer = null;
+ } else {
+ serializer = getSerializer(elementType);
+ }
+ } else {
+ if (elementType == null) {
+ serializer = getSerializer(type);
+ }
+ }
+
+ if (type == Type.ARRAY) {
+ serializer = GenericArraySerializer.get(elementType);
+ } else {
+ serializer = null;
+ }
+ return serializer;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <T> Serializer<T> getSerializer(Type type, Class<T> clazz) {
+ Serializer serializer = null;
+ if (type != Type.FIXED) {
+ serializer = null;
+ }
+ if (clazz == null) {
+ serializer = null;
+ } else {
+ serializer = SpecificFixedSerializer.get(clazz);
+ }
+ return serializer;
+ }
+
+}
Added: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java?rev=1361302&view=auto
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java (added)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java Fri Jul 13 17:11:38 2012
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import me.prettyprint.cassandra.serializers.AbstractSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.hector.api.Serializer;
+import me.prettyprint.hector.api.ddl.ComparatorType;
+import static me.prettyprint.hector.api.ddl.ComparatorType.BYTESTYPE;
+
+import org.apache.avro.specific.SpecificFixed;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.util.Utf8;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SpecificFixedSerializer translates the byte[] to and from SpecificFixed of Avro.
+ */
+public class SpecificFixedSerializer extends AbstractSerializer<SpecificFixed> {
+
+ public static final Logger LOG = LoggerFactory.getLogger(SpecificFixedSerializer.class);
+
+ // for toByteBuffer
+ private static SpecificFixedSerializer serializer = new SpecificFixedSerializer(SpecificFixed.class);
+
+ // for fromByteBuffer, requiring Class info
+ public static SpecificFixedSerializer get() {
+ return serializer;
+ }
+
+ private static Map<Class, SpecificFixedSerializer> classToSerializerMap = new HashMap<Class, SpecificFixedSerializer>();
+
+ public static SpecificFixedSerializer get(Class clazz) {
+ SpecificFixedSerializer serializer = classToSerializerMap.get(clazz);
+ if (serializer == null) {
+ serializer = new SpecificFixedSerializer(clazz);
+ classToSerializerMap.put(clazz, serializer);
+ }
+ return serializer;
+ }
+
+ private Class<? extends SpecificFixed> clazz;
+
+ public SpecificFixedSerializer(Class<? extends SpecificFixed> clazz) {
+ this.clazz = clazz;
+ }
+
+ @Override
+ public ByteBuffer toByteBuffer(SpecificFixed fixed) {
+ if (fixed == null) {
+ return null;
+ }
+ byte[] bytes = fixed.bytes();
+ if (bytes.length < 1) {
+ return null;
+ }
+ return BytesArraySerializer.get().toByteBuffer(bytes);
+ }
+
+ @Override
+ public SpecificFixed fromByteBuffer(ByteBuffer byteBuffer) {
+ if (byteBuffer == null) {
+ return null;
+ }
+
+ Object value = null;
+ try {
+ value = clazz.newInstance();
+ } catch (InstantiationException ie) {
+ LOG.warn("Instantiation error for class=" + clazz, ie);
+ return null;
+ } catch (IllegalAccessException iae) {
+ LOG.warn("Illegal access error for class=" + clazz, iae);
+ return null;
+ }
+
+ if (! (value instanceof SpecificFixed)) {
+ LOG.warn("Not an instance of SpecificFixed");
+ return null;
+ }
+
+ SpecificFixed fixed = (SpecificFixed) value;
+ byte[] bytes = fixed.bytes();
+ try {
+ byteBuffer.get(bytes, 0, bytes.length);
+ }
+ catch (BufferUnderflowException e) {
+ // LOG.info(e.toString() + " : class=" + clazz.getName() + " length=" + bytes.length);
+ throw e;
+ }
+ fixed.bytes(bytes);
+ return fixed;
+ }
+
+ @Override
+ public ComparatorType getComparatorType() {
+ return BYTESTYPE;
+ }
+
+}
Added: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java?rev=1361302&view=auto
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java (added)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java Fri Jul 13 17:11:38 2012
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import java.nio.ByteBuffer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificFixed;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.ListGenericArray;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.StatefulHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class that infers the concrete Serializer needed to turn a value into
+ * its binary representation
+ */
+public class TypeUtils {
+
+ public static final Logger LOG = LoggerFactory.getLogger(TypeUtils.class);
+
+ // @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static Class getClass(Object value) {
+ return value.getClass();
+ }
+
+ public static Schema getSchema(Object value) {
+ if (value instanceof GenericArray) {
+ return Schema.createArray( getElementSchema((GenericArray)value) );
+ } else {
+ return getSchema( getClass(value) );
+ }
+ }
+
+ public static Type getType(Object value) {
+ return getType( getClass(value) );
+ }
+
+ public static Type getType(Class<?> clazz) {
+ if (clazz.equals(Utf8.class)) {
+ return Type.STRING;
+ } else if (clazz.equals(Boolean.class) || clazz.equals(boolean.class)) {
+ return Type.BOOLEAN;
+ } else if (clazz.equals(ByteBuffer.class)) {
+ return Type.BYTES;
+ } else if (clazz.equals(Double.class) || clazz.equals(double.class)) {
+ return Type.DOUBLE;
+ } else if (clazz.equals(Float.class) || clazz.equals(float.class)) {
+ return Type.FLOAT;
+ } else if (clazz.equals(Integer.class) || clazz.equals(int.class)) {
+ return Type.INT;
+ } else if (clazz.equals(Long.class) || clazz.equals(long.class)) {
+ return Type.LONG;
+ } else if (clazz.equals(ListGenericArray.class)) {
+ return Type.ARRAY;
+ } else if (clazz.equals(StatefulHashMap.class)) {
+ return Type.MAP;
+ } else if (clazz.equals(Persistent.class)) {
+ return Type.RECORD;
+ } else if (clazz.getSuperclass().equals(SpecificFixed.class)) {
+ return Type.FIXED;
+ } else {
+ return null;
+ }
+ }
+
+ public static Class getClass(Type type) {
+ if (type == Type.STRING) {
+ return Utf8.class;
+ } else if (type == Type.BOOLEAN) {
+ return Boolean.class;
+ } else if (type == Type.BYTES) {
+ return ByteBuffer.class;
+ } else if (type == Type.DOUBLE) {
+ return Double.class;
+ } else if (type == Type.FLOAT) {
+ return Float.class;
+ } else if (type == Type.INT) {
+ return Integer.class;
+ } else if (type == Type.LONG) {
+ return Long.class;
+ } else if (type == Type.ARRAY) {
+ return ListGenericArray.class;
+ } else if (type == Type.MAP) {
+ return StatefulHashMap.class;
+ } else if (type == Type.RECORD) {
+ return Persistent.class;
+ } else if (type == Type.FIXED) {
+ // return SpecificFixed.class;
+ return null;
+ } else {
+ return null;
+ }
+ }
+
+ public static Schema getSchema(Class clazz) {
+ Type type = getType(clazz);
+ if (type == null) {
+ return null;
+ } else if (type == Type.FIXED) {
+ int size = getFixedSize(clazz);
+ String name = clazz.getName();
+ String space = null;
+ int n = name.lastIndexOf(".");
+ if (n < 0) {
+ space = name.substring(0,n);
+ name = name.substring(n+1);
+ } else {
+ space = null;
+ }
+ String doc = null; // ?
+ // LOG.info(Schema.createFixed(name, doc, space, size).toString());
+ return Schema.createFixed(name, doc, space, size);
+ } else if (type == Type.ARRAY) {
+ Object obj = null;
+ try {
+ obj = clazz.newInstance();
+ } catch (InstantiationException e) {
+ LOG.warn(e.toString());
+ return null;
+ } catch (IllegalAccessException e) {
+ LOG.warn(e.toString());
+ return null;
+ }
+ return getSchema(obj);
+ } else if (type == Type.MAP) {
+ // TODO
+ // return Schema.createMap(...);
+ return null;
+ } else if (type == Type.RECORD) {
+ // TODO
+ // return Schema.createRecord(...);
+ return null;
+ } else {
+ return Schema.create(type);
+ }
+ }
+
+ public static Class getClass(Schema schema) {
+ Type type = schema.getType();
+ if (type == null) {
+ return null;
+ } else if (type == Type.FIXED) {
+ try {
+ return Class.forName( schema.getFullName() );
+ } catch (ClassNotFoundException e) {
+ LOG.warn(e.toString() + " : " + schema);
+ return null;
+ }
+ } else {
+ return getClass(type);
+ }
+ }
+
+ public static int getFixedSize(Type type) {
+ if (type == Type.BOOLEAN) {
+ return 1;
+ } else if (type == Type.DOUBLE) {
+ return 8;
+ } else if (type == Type.FLOAT) {
+ return 4;
+ } else if (type == Type.INT) {
+ return 4;
+ } else if (type == Type.LONG) {
+ return 8;
+ } else {
+ return -1;
+ }
+ }
+
+ public static int getFixedSize(Schema schema) {
+ Type type = schema.getType();
+ if (type == Type.FIXED) {
+ return schema.getFixedSize();
+ } else {
+ return getFixedSize(type);
+ }
+ }
+
+ public static int getFixedSize(Class clazz) {
+ Type type = getType(clazz);
+ if (type == Type.FIXED) {
+ try {
+ return ((SpecificFixed)clazz.newInstance()).bytes().length;
+ } catch (InstantiationException e) {
+ LOG.warn(e.toString());
+ return -1;
+ } catch (IllegalAccessException e) {
+ LOG.warn(e.toString());
+ return -1;
+ }
+ } else {
+ return getFixedSize(type);
+ }
+ }
+
+ public static Schema getElementSchema(GenericArray array) {
+ Schema schema = array.getSchema();
+ return (schema.getType() == Type.ARRAY) ? schema.getElementType() : schema;
+ }
+
+ public static Type getElementType(ListGenericArray array) {
+ return getElementSchema(array).getType();
+ }
+
+ /*
+ public static Schema getValueSchema(StatefulHashMap map) {
+ return map.getSchema().getValueType();
+ }
+
+ public static Type getValueType(StatefulHashMap map) {
+ return getValueSchema(map).getType();
+ }
+ */
+
+}
Added: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java?rev=1361302&view=auto
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java (added)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java Fri Jul 13 17:11:38 2012
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import java.nio.ByteBuffer;
+
+import me.prettyprint.cassandra.serializers.AbstractSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.ddl.ComparatorType;
+import static me.prettyprint.hector.api.ddl.ComparatorType.UTF8TYPE;
+
+import org.apache.avro.util.Utf8;
+
+/**
+ * A Utf8Serializer translates the byte[] to and from Utf8 object of Avro.
+ */
+public final class Utf8Serializer extends AbstractSerializer<Utf8> {
+
+ private static final Utf8Serializer instance = new Utf8Serializer();
+
+ public static Utf8Serializer get() {
+ return instance;
+ }
+
+ @Override
+ public ByteBuffer toByteBuffer(Utf8 obj) {
+ if (obj == null) {
+ return null;
+ }
+ return StringSerializer.get().toByteBuffer(obj.toString());
+ }
+
+ @Override
+ public Utf8 fromByteBuffer(ByteBuffer byteBuffer) {
+ if (byteBuffer == null) {
+ return null;
+ }
+ return new Utf8(StringSerializer.get().fromByteBuffer(byteBuffer));
+ }
+
+ @Override
+ public ComparatorType getComparatorType() {
+ return UTF8TYPE;
+ }
+
+}
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1361302&r1=1361301&r2=1361302&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Fri Jul 13 17:11:38 2012
@@ -25,12 +25,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
-import me.prettyprint.cassandra.serializers.FloatSerializer;
import me.prettyprint.cassandra.serializers.IntegerSerializer;
-import me.prettyprint.cassandra.serializers.DoubleSerializer;
import me.prettyprint.cassandra.serializers.StringSerializer;
-import me.prettyprint.cassandra.serializers.SerializerTypeInferer;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
@@ -45,12 +43,17 @@ import me.prettyprint.hector.api.mutatio
import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.RangeSlicesQuery;
import me.prettyprint.hector.api.query.RangeSuperSlicesQuery;
-import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
import me.prettyprint.hector.api.HConsistencyLevel;
import me.prettyprint.hector.api.Serializer;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericArray;
import org.apache.avro.util.Utf8;
import org.apache.gora.cassandra.query.CassandraQuery;
+import org.apache.gora.cassandra.serializers.GenericArraySerializer;
+import org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer;
+import org.apache.gora.cassandra.serializers.TypeUtils;
import org.apache.gora.mapreduce.GoraRecordReader;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.query.Query;
@@ -86,7 +89,7 @@ public class CassandraClient<K, T extend
// Just create a Keyspace object on the client side, corresponding to an already existing keyspace with already created column families.
this.keyspace = HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster);
- this.keySerializer = SerializerTypeInferer.getSerializer(keyClass);
+ this.keySerializer = GoraSerializerTypeInferer.getSerializer(keyClass);
this.mutator = HFactory.createMutator(this.keyspace, this.keySerializer);
}
@@ -148,8 +151,12 @@ public class CassandraClient<K, T extend
String columnFamily = this.cassandraMapping.getFamily(fieldName);
String columnName = this.cassandraMapping.getColumn(fieldName);
+ if (columnName == null) {
+ LOG.warn("Column name is null for field=" + fieldName + " with value=" + value.toString());
+ return;
+ }
- this.mutator.insert(key, columnFamily, HFactory.createColumn(columnName, byteBuffer, StringSerializer.get(), ByteBufferSerializer.get()));
+ HectorUtils.insertColumn(mutator, key, columnFamily, columnName, byteBuffer);
}
/**
@@ -164,14 +171,43 @@ public class CassandraClient<K, T extend
if (value == null) {
return;
}
-
+
ByteBuffer byteBuffer = toByteBuffer(value);
String columnFamily = this.cassandraMapping.getFamily(fieldName);
String superColumnName = this.cassandraMapping.getColumn(fieldName);
- this.mutator.insert(key, columnFamily, HFactory.createSuperColumn(superColumnName, Arrays.asList(HFactory.createColumn(columnName, byteBuffer, ByteBufferSerializer.get(), ByteBufferSerializer.get())), StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get()));
-
+ HectorUtils.insertSubColumn(mutator, key, columnFamily, superColumnName, columnName, byteBuffer);
+ }
+
+ public void addSubColumn(K key, String fieldName, String columnName, Object value) {
+ addSubColumn(key, fieldName, StringSerializer.get().toByteBuffer(columnName), value);
+ }
+
+ public void addSubColumn(K key, String fieldName, Integer columnName, Object value) {
+ addSubColumn(key, fieldName, IntegerSerializer.get().toByteBuffer(columnName), value);
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public void addGenericArray(K key, String fieldName, GenericArray array) {
+ if (isSuper( cassandraMapping.getFamily(fieldName) )) {
+ int i= 0;
+ for (Object itemValue: array) {
+
+ // TODO: hack, do not store empty arrays
+ if (itemValue instanceof GenericArray<?>) {
+ if (((GenericArray)itemValue).size() == 0) {
+ continue;
+ }
+ }
+
+ addSubColumn(key, fieldName, i++, itemValue);
+ }
+ }
+ else {
+ addColumn(key, fieldName, array);
+ }
}
/**
@@ -181,29 +217,17 @@ public class CassandraClient<K, T extend
*/
@SuppressWarnings("unchecked")
public ByteBuffer toByteBuffer(Object value) {
- if (value == null) {
- return null;
- }
-
ByteBuffer byteBuffer = null;
- if (value instanceof ByteBuffer) {
- byteBuffer = (ByteBuffer) value;
- }
- else if (value instanceof Utf8) {
- byteBuffer = StringSerializer.get().toByteBuffer(((Utf8)value).toString());
- }
- else if (value instanceof Float) {
- // workaround for hector-core-1.0-1.jar
- // because SerializerTypeInferer.getSerializer(Float ) returns ObjectSerializer !?
- byteBuffer = FloatSerializer.get().toByteBuffer((Float)value);
- }
- else if (value instanceof Double) {
- // workaround for hector-core-1.0-1.jar
- // because SerializerTypeInferer.getSerializer(Double ) returns ObjectSerializer !?
- byteBuffer = DoubleSerializer.get().toByteBuffer((Double)value);
+ Serializer serializer = GoraSerializerTypeInferer.getSerializer(value);
+ if (serializer == null) {
+ LOG.info("Serializer not found for: " + value.toString());
}
else {
- byteBuffer = SerializerTypeInferer.getSerializer(value).toByteBuffer(value);
+ byteBuffer = serializer.toByteBuffer(value);
+ }
+
+ if (byteBuffer == null) {
+ LOG.info("value class=" + value.getClass().getName() + " value=" + value + " -> null");
}
return byteBuffer;
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1361302&r1=1361301&r2=1361302&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Fri Jul 13 17:11:38 2012
@@ -41,6 +41,7 @@ import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificFixed;
import org.apache.avro.util.Utf8;
import org.apache.gora.cassandra.query.CassandraQuery;
import org.apache.gora.cassandra.query.CassandraResult;
@@ -299,8 +300,7 @@ public class CassandraStore<K, T extends
break;
case ARRAY:
GenericArray array = (GenericArray) fieldValue;
- Type elementType = fieldSchema.getElementType().getType();
- GenericArray newArray = new ListGenericArray(Schema.create(elementType));
+ ListGenericArray newArray = new ListGenericArray(fieldSchema.getElementType());
Iterator iter = array.iterator();
while (iter.hasNext()) {
newArray.add(iter.next());
@@ -328,11 +328,13 @@ public class CassandraStore<K, T extends
Type type = schema.getType();
switch (type) {
case STRING:
+ case BOOLEAN:
case INT:
case LONG:
case BYTES:
case FLOAT:
case DOUBLE:
+ case FIXED:
this.cassandraClient.addColumn(key, field.name(), value);
break;
case RECORD:
@@ -344,16 +346,12 @@ public class CassandraStore<K, T extends
// TODO: hack, do not store empty arrays
Object memberValue = persistentBase.get(member.pos());
if (memberValue instanceof GenericArray<?>) {
- GenericArray<String> array = (GenericArray<String>) memberValue;
- if (array.size() == 0) {
+ if (((GenericArray)memberValue).size() == 0) {
continue;
}
}
-
- if (memberValue instanceof Utf8) {
- memberValue = memberValue.toString();
- }
- this.cassandraClient.addSubColumn(key, field.name(), StringSerializer.get().toByteBuffer(member.name()), memberValue);
+
+ this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
}
} else {
LOG.info("Record not supported: " + value.toString());
@@ -371,16 +369,12 @@ public class CassandraStore<K, T extends
// TODO: hack, do not store empty arrays
Object keyValue = map.get(mapKey);
if (keyValue instanceof GenericArray<?>) {
- GenericArray<String> array = (GenericArray<String>) keyValue;
- if (array.size() == 0) {
+ if (((GenericArray)keyValue).size() == 0) {
continue;
}
}
-
- if (keyValue instanceof Utf8) {
- keyValue = keyValue.toString();
- }
- this.cassandraClient.addSubColumn(key, field.name(), StringSerializer.get().toByteBuffer(mapKey.toString()), keyValue);
+
+ this.cassandraClient.addSubColumn(key, field.name(), mapKey.toString(), keyValue);
}
} else {
LOG.info("Map not supported: " + value.toString());
@@ -390,14 +384,7 @@ public class CassandraStore<K, T extends
case ARRAY:
if (value != null) {
if (value instanceof GenericArray<?>) {
- GenericArray<Object> array = (GenericArray<Object>) value;
- int i= 0;
- for (Object itemValue: array) {
- if (itemValue instanceof Utf8) {
- itemValue = itemValue.toString();
- }
- this.cassandraClient.addSubColumn(key, field.name(), IntegerSerializer.get().toByteBuffer(i++), itemValue);
- }
+ this.cassandraClient.addGenericArray(key, field.name(), (GenericArray)value);
} else {
LOG.info("Array not supported: " + value.toString());
}
Added: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java?rev=1361302&view=auto
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java (added)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java Fri Jul 13 17:11:38 2012
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.store;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.beans.HSuperColumn;
+import me.prettyprint.hector.api.factory.HFactory;
+import me.prettyprint.hector.api.mutation.Mutator;
+import me.prettyprint.hector.api.Serializer;
+
+import org.apache.gora.persistency.Persistent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HectorUtils<K,T extends Persistent> {
+
+ public static final Logger LOG = LoggerFactory.getLogger(HectorUtils.class);
+
+ public static<K> void insertColumn(Mutator<K> mutator, K key, String columnFamily, ByteBuffer columnName, ByteBuffer columnValue) {
+ mutator.insert(key, columnFamily, createColumn(columnName, columnValue));
+ }
+
+ public static<K> void insertColumn(Mutator<K> mutator, K key, String columnFamily, String columnName, ByteBuffer columnValue) {
+ mutator.insert(key, columnFamily, createColumn(columnName, columnValue));
+ }
+
+
+ public static<K> HColumn<ByteBuffer,ByteBuffer> createColumn(ByteBuffer name, ByteBuffer value) {
+ return HFactory.createColumn(name, value, ByteBufferSerializer.get(), ByteBufferSerializer.get());
+ }
+
+ public static<K> HColumn<String,ByteBuffer> createColumn(String name, ByteBuffer value) {
+ return HFactory.createColumn(name, value, StringSerializer.get(), ByteBufferSerializer.get());
+ }
+
+ public static<K> HColumn<Integer,ByteBuffer> createColumn(Integer name, ByteBuffer value) {
+ return HFactory.createColumn(name, value, IntegerSerializer.get(), ByteBufferSerializer.get());
+ }
+
+
+ public static<K> void insertSubColumn(Mutator<K> mutator, K key, String columnFamily, String superColumnName, ByteBuffer columnName, ByteBuffer columnValue) {
+ mutator.insert(key, columnFamily, createSuperColumn(superColumnName, columnName, columnValue));
+ }
+
+ public static<K> void insertSubColumn(Mutator<K> mutator, K key, String columnFamily, String superColumnName, String columnName, ByteBuffer columnValue) {
+ mutator.insert(key, columnFamily, createSuperColumn(superColumnName, columnName, columnValue));
+ }
+
+ public static<K> void insertSubColumn(Mutator<K> mutator, K key, String columnFamily, String superColumnName, Integer columnName, ByteBuffer columnValue) {
+ mutator.insert(key, columnFamily, createSuperColumn(superColumnName, columnName, columnValue));
+ }
+
+
+ public static<K> HSuperColumn<String,ByteBuffer,ByteBuffer> createSuperColumn(String superColumnName, ByteBuffer columnName, ByteBuffer columnValue) {
+ return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue)), StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get());
+ }
+
+ public static<K> HSuperColumn<String,String,ByteBuffer> createSuperColumn(String superColumnName, String columnName, ByteBuffer columnValue) {
+ return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue)), StringSerializer.get(), StringSerializer.get(), ByteBufferSerializer.get());
+ }
+
+ public static<K> HSuperColumn<String,Integer,ByteBuffer> createSuperColumn(String superColumnName, Integer columnName, ByteBuffer columnValue) {
+ return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue)), StringSerializer.get(), IntegerSerializer.get(), ByteBufferSerializer.get());
+ }
+
+}