You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by ka...@apache.org on 2012/07/18 17:28:06 UTC
svn commit: r1362980 - in
/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra: query/
serializers/ store/
Author: kazk
Date: Wed Jul 18 15:28:06 2012
New Revision: 1362980
URL: http://svn.apache.org/viewvc?rev=1362980&view=rev
Log:
Fixes GORA-149 to add a serializer for MAP
Added:
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java
Modified:
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1362980&r1=1362979&r2=1362980&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java Wed Jul 18 15:28:06 2012
@@ -38,7 +38,9 @@ import org.apache.avro.generic.GenericAr
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
import org.apache.gora.cassandra.serializers.GenericArraySerializer;
+import org.apache.gora.cassandra.serializers.StatefulHashMapSerializer;
import org.apache.gora.cassandra.serializers.TypeUtils;
+import org.apache.gora.persistency.StatefulHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,6 +78,10 @@ public class CassandraSubColumn extends
GenericArraySerializer serializer = GenericArraySerializer.get(fieldSchema.getElementType());
GenericArray genericArray = serializer.fromByteBuffer(byteBuffer);
value = genericArray;
+ } else if (type == Type.MAP) {
+ StatefulHashMapSerializer serializer = StatefulHashMapSerializer.get(fieldSchema.getValueType());
+ StatefulHashMap map = serializer.fromByteBuffer(byteBuffer);
+ value = map;
} else {
value = fromByteBuffer(fieldSchema, byteBuffer);
}
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java?rev=1362980&r1=1362979&r2=1362980&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java Wed Jul 18 15:28:06 2012
@@ -37,6 +37,8 @@ import org.apache.avro.generic.GenericAr
import org.apache.avro.specific.SpecificFixed;
import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.StatefulHashMap;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,6 +81,16 @@ public class GoraSerializerTypeInferer {
schema = schema.getElementType();
}
serializer = GenericArraySerializer.get(schema);
+ } else if (value instanceof StatefulHashMap) {
+ StatefulHashMap map = (StatefulHashMap)value;
+ if (map.size() == 0) {
+ serializer = ByteBufferSerializer.get();
+ }
+ else {
+ Object value0 = map.values().iterator().next();
+ Schema schema = TypeUtils.getSchema(value0);
+ serializer = StatefulHashMapSerializer.get(schema);
+ }
} else {
serializer = SerializerTypeInferer.getSerializer(value);
}
@@ -134,6 +146,8 @@ public class GoraSerializerTypeInferer {
// serializer = SpecificFixedSerializer.get(schema);
} else if (type == Type.ARRAY) {
serializer = GenericArraySerializer.get(schema.getElementType());
+ } else if (type == Type.MAP) {
+ serializer = StatefulHashMapSerializer.get(schema.getValueType());
} else {
serializer = null;
}
@@ -182,6 +196,8 @@ public class GoraSerializerTypeInferer {
if (type == Type.ARRAY) {
serializer = GenericArraySerializer.get(elementType);
+ } else if (type == Type.MAP) {
+ serializer = StatefulHashMapSerializer.get(elementType);
} else {
serializer = null;
}
Added: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java?rev=1362980&view=auto
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java (added)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java Wed Jul 18 15:28:06 2012
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import me.prettyprint.cassandra.serializers.AbstractSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.hector.api.Serializer;
+import me.prettyprint.hector.api.ddl.ComparatorType;
+import static me.prettyprint.hector.api.ddl.ComparatorType.UTF8TYPE;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.specific.SpecificFixed;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.StatefulHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A StatefulHashMapSerializer translates the byte[] to and from StatefulHashMap of Avro.
+ */
+public class StatefulHashMapSerializer<T> extends AbstractSerializer<StatefulHashMap<Utf8, T>> {
+
+ public static final Logger LOG = LoggerFactory.getLogger(StatefulHashMapSerializer.class);
+
+ private static Map<Type, StatefulHashMapSerializer> valueTypeToSerializerMap = new HashMap<Type, StatefulHashMapSerializer>();
+ private static Map<Class, StatefulHashMapSerializer> fixedClassToSerializerMap = new HashMap<Class, StatefulHashMapSerializer>();
+
+ public static StatefulHashMapSerializer get(Type valueType) {
+ StatefulHashMapSerializer serializer = valueTypeToSerializerMap.get(valueType);
+ if (serializer == null) {
+ serializer = new StatefulHashMapSerializer(valueType);
+ valueTypeToSerializerMap.put(valueType, serializer);
+ }
+ return serializer;
+ }
+
+ public static StatefulHashMapSerializer get(Type valueType, Class clazz) {
+ if (valueType != Type.FIXED) {
+ return null;
+ }
+ StatefulHashMapSerializer serializer = valueTypeToSerializerMap.get(clazz);
+ if (serializer == null) {
+ serializer = new StatefulHashMapSerializer(clazz);
+ fixedClassToSerializerMap.put(clazz, serializer);
+ }
+ return serializer;
+ }
+
+ public static StatefulHashMapSerializer get(Schema valueSchema) {
+ Type type = valueSchema.getType();
+ if (type == Type.FIXED) {
+ return get(Type.FIXED, TypeUtils.getClass(valueSchema));
+ } else {
+ return get(type);
+ }
+ }
+
+ private Schema valueSchema = null;
+ private Type valueType = null;
+ private int size = -1;
+ private Class<T> clazz = null;
+ private Serializer<T> valueSerializer = null;
+
+ public StatefulHashMapSerializer(Serializer<T> valueSerializer) {
+ this.valueSerializer = valueSerializer;
+ }
+
+ public StatefulHashMapSerializer(Schema valueSchema) {
+ this.valueSchema = valueSchema;
+ valueType = valueSchema.getType();
+ size = TypeUtils.getFixedSize(valueSchema);
+ valueSerializer = GoraSerializerTypeInferer.getSerializer(valueSchema);
+ }
+
+ public StatefulHashMapSerializer(Type valueType) {
+ this.valueType = valueType;
+ if (valueType != Type.FIXED) {
+ valueSchema = Schema.create(valueType);
+ }
+ clazz = TypeUtils.getClass(valueType);
+ size = TypeUtils.getFixedSize(valueType);
+ valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType);
+ }
+
+ public StatefulHashMapSerializer(Class<T> clazz) {
+ this.clazz = clazz;
+ valueType = TypeUtils.getType(clazz);
+ size = TypeUtils.getFixedSize(clazz);
+ if (valueType == null || valueType == Type.FIXED) {
+ valueType = Type.FIXED;
+ valueSchema = TypeUtils.getSchema(clazz);
+ valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType, clazz);
+ } else {
+ valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType);
+ }
+ }
+
+ @Override
+ public ByteBuffer toByteBuffer(StatefulHashMap<Utf8, T> map) {
+ if (map == null) {
+ return null;
+ }
+ if (size > 0) {
+ return toByteBufferWithFixedLengthElements(map);
+ } else {
+ return toByteBufferWithVariableLengthElements(map);
+ }
+ }
+
+ private ByteBuffer toByteBufferWithFixedLengthElements(StatefulHashMap<Utf8, T> map) {
+ int n = (int) map.size();
+ List<byte[]> list = new ArrayList<byte[]>(n);
+ n *= 4;
+ for (Utf8 key : map.keySet()) {
+ T value = map.get(key);
+ byte[] bytes = BytesArraySerializer.get().fromByteBuffer(Utf8Serializer.get().toByteBuffer(key));
+ list.add(bytes);
+ n += bytes.length;
+ bytes = BytesArraySerializer.get().fromByteBuffer(valueSerializer.toByteBuffer(value));
+ list.add(bytes);
+ n += bytes.length;
+ }
+ ByteBuffer byteBuffer = ByteBuffer.allocate(n);
+ int i = 0;
+ for (byte[] bytes : list) {
+ if (i % 2 == 0) {
+ byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length));
+ }
+ byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes));
+ i += 1;
+ }
+ byteBuffer.rewind();
+ return byteBuffer;
+ }
+
+ private ByteBuffer toByteBufferWithVariableLengthElements(StatefulHashMap<Utf8, T> map) {
+ int n = (int) map.size();
+ List<byte[]> list = new ArrayList<byte[]>(n);
+ n *= 8;
+ for (Utf8 key : map.keySet()) {
+ T value = map.get(key);
+ byte[] bytes = BytesArraySerializer.get().fromByteBuffer(Utf8Serializer.get().toByteBuffer(key));
+ list.add(bytes);
+ n += bytes.length;
+ bytes = BytesArraySerializer.get().fromByteBuffer(valueSerializer.toByteBuffer(value));
+ list.add(bytes);
+ n += bytes.length;
+ }
+ ByteBuffer byteBuffer = ByteBuffer.allocate(n);
+ for (byte[] bytes : list) {
+ byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length));
+ byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes));
+ }
+ byteBuffer.rewind();
+ return byteBuffer;
+ }
+
+ @Override
+ public StatefulHashMap<Utf8, T> fromByteBuffer(ByteBuffer byteBuffer) {
+ if (byteBuffer == null) {
+ return null;
+ }
+ StatefulHashMap<Utf8, T> map = new StatefulHashMap<Utf8, T>();
+int i = 0;
+ while (true) {
+ Utf8 key = null;
+ T value = null;
+ try {
+ int n = IntegerSerializer.get().fromByteBuffer(byteBuffer);
+ byte[] bytes = new byte[n];
+ byteBuffer.get(bytes, 0, n);
+ key = Utf8Serializer.get().fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) );
+
+ if (size > 0) {
+ value = valueSerializer.fromByteBuffer(byteBuffer);
+ }
+ else {
+ n = IntegerSerializer.get().fromByteBuffer(byteBuffer);
+ bytes = new byte[n];
+ byteBuffer.get(bytes, 0, n);
+ value = valueSerializer.fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) );
+ }
+ } catch (BufferUnderflowException e) {
+ break;
+ }
+ if (key == null) {
+ break;
+ }
+ if (value == null) {
+ break;
+ }
+ map.put(key, value);
+ }
+ return map;
+ }
+
+ @Override
+ public ComparatorType getComparatorType() {
+ return valueSerializer.getComparatorType();
+ }
+
+}
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1362980&r1=1362979&r2=1362980&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Wed Jul 18 15:28:06 2012
@@ -56,6 +56,7 @@ import org.apache.gora.cassandra.seriali
import org.apache.gora.cassandra.serializers.TypeUtils;
import org.apache.gora.mapreduce.GoraRecordReader;
import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.StatefulHashMap;
import org.apache.gora.query.Query;
import org.apache.gora.util.ByteUtils;
import org.slf4j.Logger;
@@ -107,7 +108,7 @@ public class CassandraClient<K, T extend
List<ColumnFamilyDefinition> columnFamilyDefinitions = this.cassandraMapping.getColumnFamilyDefinitions();
keyspaceDefinition = HFactory.createKeyspaceDefinition(this.cassandraMapping.getKeyspaceName(), "org.apache.cassandra.locator.SimpleStrategy", 1, columnFamilyDefinitions);
this.cluster.addKeyspace(keyspaceDefinition, true);
- LOG.info("Keyspace '" + this.cassandraMapping.getKeyspaceName() + "' in cluster '" + this.cassandraMapping.getClusterName() + "' was created on host '" + this.cassandraMapping.getHostName() + "'");
+ // LOG.info("Keyspace '" + this.cassandraMapping.getKeyspaceName() + "' in cluster '" + this.cassandraMapping.getClusterName() + "' was created on host '" + this.cassandraMapping.getHostName() + "'");
// Create a customized Consistency Level
ConfigurableConsistencyLevel configurableConsistencyLevel = new ConfigurableConsistencyLevel();
@@ -200,6 +201,10 @@ public class CassandraClient<K, T extend
if (((GenericArray)itemValue).size() == 0) {
continue;
}
+ } else if (itemValue instanceof StatefulHashMap<?,?>) {
+ if (((StatefulHashMap)itemValue).size() == 0) {
+ continue;
+ }
}
addSubColumn(key, fieldName, i++, itemValue);
@@ -210,6 +215,32 @@ public class CassandraClient<K, T extend
}
}
+ @SuppressWarnings("unchecked")
+ public void addStatefulHashMap(K key, String fieldName, StatefulHashMap<Utf8,Object> map) {
+ if (isSuper( cassandraMapping.getFamily(fieldName) )) {
+ int i= 0;
+ for (Utf8 mapKey: map.keySet()) {
+
+ // TODO: hack, do not store empty arrays
+ Object mapValue = map.get(mapKey);
+ if (mapValue instanceof GenericArray<?>) {
+ if (((GenericArray)mapValue).size() == 0) {
+ continue;
+ }
+ } else if (mapValue instanceof StatefulHashMap<?,?>) {
+ if (((StatefulHashMap)mapValue).size() == 0) {
+ continue;
+ }
+ }
+
+ addSubColumn(key, fieldName, mapKey.toString(), mapValue);
+ }
+ }
+ else {
+ addColumn(key, fieldName, map);
+ }
+ }
+
/**
* Serialize value to ByteBuffer.
* @param value the member value
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java?rev=1362980&r1=1362979&r2=1362980&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java Wed Jul 18 15:28:06 2012
@@ -148,7 +148,7 @@ public class CassandraMapping {
if (superAttribute != null) {
// LOG.info("Located super column family");
this.superFamilies.add(familyName);
- LOG.info("Added super column family: '" + familyName + "'");
+ // LOG.info("Added super column family: '" + familyName + "'");
cfDef.setColumnType(ColumnType.SUPER);
cfDef.setSubComparatorType(ComparatorType.BYTESTYPE);
}
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java?rev=1362980&r1=1362979&r2=1362980&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java Wed Jul 18 15:28:06 2012
@@ -104,13 +104,13 @@ public class CassandraMappingManager {
LOG.warn("Error locating Cassandra Keyspace element!");
}
else {
- LOG.info("Located Cassandra Keyspace: '" + KEYSPACE_ELEMENT + "'");
+ // LOG.info("Located Cassandra Keyspace: '" + KEYSPACE_ELEMENT + "'");
for (Element keyspace : keyspaces) {
String keyspaceName = keyspace.getAttributeValue(NAME_ATTRIBUTE);
if (keyspaceName == null) {
LOG.warn("Error locating Cassandra Keyspace name attribute!");
}
- LOG.info("Located Cassandra Keyspace name: '" + NAME_ATTRIBUTE + "'");
+ // LOG.info("Located Cassandra Keyspace name: '" + NAME_ATTRIBUTE + "'");
keyspaceMap.put(keyspaceName, keyspace);
}
}
@@ -121,14 +121,14 @@ public class CassandraMappingManager {
LOG.warn("Error locating Cassandra Mapping element!");
}
else {
- LOG.info("Located Cassandra Mapping: '" + MAPPING_ELEMENT + "'");
+ // LOG.info("Located Cassandra Mapping: '" + MAPPING_ELEMENT + "'");
for (Element mapping : mappings) {
String className = mapping.getAttributeValue(NAME_ATTRIBUTE);
if (className == null) {
LOG.warn("Error locating Cassandra Mapping class name attribute!");
continue;
}
- LOG.info("Located Cassandra Mapping class name: '" + NAME_ATTRIBUTE + "'");
+ // LOG.info("Located Cassandra Mapping class name: '" + NAME_ATTRIBUTE + "'");
mappingMap.put(className, mapping);
}
}
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1362980&r1=1362979&r2=1362980&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Wed Jul 18 15:28:06 2012
@@ -278,8 +278,9 @@ public class CassandraStore<K, T extends
T p = (T) value.newInstance(new StateManagerImpl());
Schema schema = value.getSchema();
for (Field field: schema.getFields()) {
- if (value.isDirty(field.pos())) {
- Object fieldValue = value.get(field.pos());
+ int fieldPos = field.pos();
+ if (value.isDirty(fieldPos)) {
+ Object fieldValue = value.get(fieldPos);
// check if field has a nested structure (array, map, or record)
Schema fieldSchema = field.schema();
@@ -309,7 +310,7 @@ public class CassandraStore<K, T extends
break;
}
- p.put(field.pos(), fieldValue);
+ p.put(fieldPos, fieldValue);
}
}
@@ -349,6 +350,10 @@ public class CassandraStore<K, T extends
if (((GenericArray)memberValue).size() == 0) {
continue;
}
+ } else if (memberValue instanceof StatefulHashMap<?,?>) {
+ if (((StatefulHashMap)memberValue).size() == 0) {
+ continue;
+ }
}
this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
@@ -362,20 +367,7 @@ public class CassandraStore<K, T extends
case MAP:
if (value != null) {
if (value instanceof StatefulHashMap<?, ?>) {
- //TODO cast to stateful map and only write dirty keys
- Map<Utf8, Object> map = (Map<Utf8, Object>) value;
- for (Utf8 mapKey: map.keySet()) {
-
- // TODO: hack, do not store empty arrays
- Object keyValue = map.get(mapKey);
- if (keyValue instanceof GenericArray<?>) {
- if (((GenericArray)keyValue).size() == 0) {
- continue;
- }
- }
-
- this.cassandraClient.addSubColumn(key, field.name(), mapKey.toString(), keyValue);
- }
+ this.cassandraClient.addStatefulHashMap(key, field.name(), (StatefulHashMap<Utf8,Object>)value);
} else {
LOG.info("Map not supported: " + value.toString());
}