You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2014/03/06 00:06:42 UTC
svn commit: r1574715 - in /gora/branches/GORA_94:
gora-cassandra/src/main/java/org/apache/gora/cassandra/query/
gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/
gora-cassandra/src/main/java/org/apache/gora/cassandra/store/ gora-cassa...
Author: lewismc
Date: Wed Mar 5 23:06:41 2014
New Revision: 1574715
URL: http://svn.apache.org/r1574715
Log:
GORA-245-v6.1
Added:
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializerUtil.java
Modified:
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
gora/branches/GORA_94/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
gora/branches/GORA_94/gora-cassandra/src/test/conf/log4j-server.properties
gora/branches/GORA_94/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
Modified: gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java (original)
+++ gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java Wed Mar 5 23:06:41 2014
@@ -30,6 +30,7 @@ import org.apache.avro.Schema.Type;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.gora.cassandra.serializers.AvroSerializerUtil;
import org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,19 +49,6 @@ public abstract class CassandraColumn {
private Field field;
private int unionType;
- public static final ThreadLocal<BinaryDecoder> decoders =
- new ThreadLocal<BinaryDecoder>();
-
- /*
- * Create a threadlocal map for the datum readers and writers, because
- * they are not thread safe, at least not before Avro 1.4.0 (See AVRO-650).
- * When they are thread safe, it is possible to maintain a single reader and
- * writer pair for every schema, instead of one for every thread.
- */
-
- public static final ConcurrentHashMap<String, SpecificDatumReader<?>> readerMap =
- new ConcurrentHashMap<String, SpecificDatumReader<?>>();
-
public void setUnionType(int pUnionType){
this.unionType = pUnionType;
}
@@ -92,7 +80,6 @@ public abstract class CassandraColumn {
public abstract ByteBuffer getName();
public abstract Object getValue();
- @SuppressWarnings({ "rawtypes" })
protected Object fromByteBuffer(Schema schema, ByteBuffer byteBuffer) {
Object value = null;
Serializer<?> serializer = GoraSerializerTypeInferer.getSerializer(schema);
@@ -101,34 +88,14 @@ public abstract class CassandraColumn {
+ "could be found. Please report this to dev@gora.apache.org");
} else {
value = serializer.fromByteBuffer(byteBuffer);
- if (schema.getType().equals(Type.RECORD)){
- String schemaId = schema.getFullName();
-
- SpecificDatumReader<?> reader = (SpecificDatumReader<?>)readerMap.get(schemaId);
- if (reader == null) {
- reader = new SpecificDatumReader(schema);// ignore dirty bits
- SpecificDatumReader localReader=null;
- if((localReader=readerMap.putIfAbsent(schemaId, reader))!=null) {
- reader = localReader;
- }
- }
-
- // initialize a decoder, possibly reusing previous one
- BinaryDecoder decoderFromCache = decoders.get();
- BinaryDecoder decoder = DecoderFactory.get().binaryDecoder((byte[])value, null);
- // put in threadlocal cache if the initial get was empty
- if (decoderFromCache==null) {
- decoders.set(decoder);
- }
+ if (schema.getType().equals(Type.RECORD) || schema.getType().equals(Type.MAP) ){
try {
- value = reader.read(null, decoder);
+ value = AvroSerializerUtil.deserializer(value, schema);
} catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ LOG.warn(field.name() + " named field could not be deserialized.");
+ }
}
}
return value;
}
-
}
Modified: gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java (original)
+++ gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java Wed Mar 5 23:06:41 2014
@@ -52,12 +52,12 @@ public class CassandraSubColumn extends
List<?> genericArray = serializer.fromByteBuffer(byteBuffer);
value = genericArray;
} else if (type.equals(Type.MAP)) {
- MapSerializer<?> serializer = MapSerializer.get(fieldSchema.getValueType());
- Map<?, ?> map = serializer.fromByteBuffer(byteBuffer);
- value = map;
+// MapSerializer<?> serializer = MapSerializer.get(fieldSchema.getValueType());
+// Map<?, ?> map = serializer.fromByteBuffer(byteBuffer);
+// value = map;
+ value = fromByteBuffer(fieldSchema, byteBuffer);
} else if (type.equals(Type.RECORD)){
value = fromByteBuffer(fieldSchema, byteBuffer);
- //TODO: Avro dan geri getirmek lazim.
} else if (type.equals(Type.UNION)){
// the selected union schema is obtained
Schema unionFieldSchema = getUnionSchema(super.getUnionType(), fieldSchema);
Modified: gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java (original)
+++ gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java Wed Mar 5 23:06:41 2014
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.HSuperColumn;
@@ -31,6 +32,7 @@ import me.prettyprint.hector.api.beans.H
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
+import org.apache.avro.util.Utf8;
import org.apache.gora.cassandra.serializers.CharSequenceSerializer;
import org.apache.gora.cassandra.store.CassandraStore;
import org.apache.gora.persistency.impl.PersistentBase;
@@ -63,11 +65,25 @@ public class CassandraSuperColumn extend
break;
case MAP:
Map<CharSequence, Object> map = new HashMap<CharSequence, Object>();
-
+
for (HColumn<ByteBuffer, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
- Object memberValue = null;
- memberValue = fromByteBuffer(fieldSchema.getValueType(), hColumn.getValue());
- map.put(CharSequenceSerializer.get().fromByteBuffer(hColumn.getName()), memberValue);
+ CharSequence mapKey = CharSequenceSerializer.get().fromByteBuffer(hColumn.getName());
+ if (mapKey.toString().indexOf(CassandraStore.UNION_COL_SUFIX) < 0) {
+ Object memberValue = null;
+ // We need detect real type for UNION Fields
+ if (fieldSchema.getValueType().getType().equals(Type.UNION)){
+
+ HColumn<ByteBuffer, ByteBuffer> cc = getUnionTypeColumn(mapKey
+ + CassandraStore.UNION_COL_SUFIX, this.hSuperColumn.getColumns());
+ Integer unionIndex = getUnionIndex(mapKey.toString(), cc);
+ Schema realSchema = fieldSchema.getValueType().getTypes().get(unionIndex);
+ memberValue = fromByteBuffer(realSchema, hColumn.getValue());
+
+ }else{
+ memberValue = fromByteBuffer(fieldSchema.getValueType(), hColumn.getValue());
+ }
+ map.put(mapKey, memberValue);
+ }
}
value = map;
@@ -116,17 +132,8 @@ public class CassandraSuperColumn extend
if (memberType.equals(Type.UNION)){
HColumn<ByteBuffer, ByteBuffer> hc = getUnionTypeColumn(memberField.name()
+ CassandraStore.UNION_COL_SUFIX, this.hSuperColumn.getColumns().toArray());
- Field unionField = new Field(memberField.name()
- + CassandraStore.UNION_COL_SUFIX, Schema.create(Type.INT),
- null, null);
-
- CassandraSubColumn unionColumn = new CassandraSubColumn();
-
- // get value of UNION stored type
- unionColumn.setField(unionField);
- unionColumn.setValue(hc);
- Object val = unionColumn.getValue();
- cassandraColumn.setUnionType(Integer.parseInt(val.toString()));
+ Integer unionIndex = getUnionIndex(memberField.name(),hc);
+ cassandraColumn.setUnionType(unionIndex);
}
record.put(record.getSchema().getField(memberName).pos(), cassandraColumn.getValue());
@@ -152,6 +159,16 @@ public class CassandraSuperColumn extend
return value;
}
+ private Integer getUnionIndex(String fieldName, HColumn<ByteBuffer, ByteBuffer> uc){
+ Integer val = IntegerSerializer.get().fromByteBuffer(uc.getValue());
+ return Integer.parseInt(val.toString());
+ }
+
+ private HColumn<ByteBuffer, ByteBuffer> getUnionTypeColumn(String fieldName,
+ List<HColumn<ByteBuffer, ByteBuffer>> columns) {
+ return getUnionTypeColumn(fieldName, columns.toArray());
+}
+
private HColumn<ByteBuffer, ByteBuffer> getUnionTypeColumn(String fieldName, Object[] hColumns) {
for (int iCnt = 0; iCnt < hColumns.length; iCnt++){
@SuppressWarnings("unchecked")
Added: gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializerUtil.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializerUtil.java?rev=1574715&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializerUtil.java (added)
+++ gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializerUtil.java Wed Mar 5 23:06:41 2014
@@ -0,0 +1,94 @@
+package org.apache.gora.cassandra.serializers;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+
+public class AvroSerializerUtil {
+
+ /**
+ * Threadlocals maintaining reusable binary decoders and encoders.
+ */
+ private static ThreadLocal<ByteArrayOutputStream> outputStream =
+ new ThreadLocal<ByteArrayOutputStream>();
+
+ public static final ThreadLocal<BinaryEncoder> encoders =
+ new ThreadLocal<BinaryEncoder>();
+
+ public static final ThreadLocal<BinaryDecoder> decoders =
+ new ThreadLocal<BinaryDecoder>();
+
+ /**
+ * Create a {@link java.util.concurrent.ConcurrentHashMap} for the
+ * datum readers and writers.
+ * This is necessary because they are not thread safe, at least not before
+ * Avro 1.4.0 (See AVRO-650).
+ * When they are thread safe, it is possible to maintain a single reader and
+ * writer pair for every schema, instead of one for every thread.
+ * @see <a href="https://issues.apache.org/jira/browse/AVRO-650">AVRO-650</a>
+ */
+ public static final ConcurrentHashMap<String, SpecificDatumWriter<?>> writerMap =
+ new ConcurrentHashMap<String, SpecificDatumWriter<?>>();
+
+ public static final ConcurrentHashMap<String, SpecificDatumReader<?>> readerMap =
+ new ConcurrentHashMap<String, SpecificDatumReader<?>>();
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public static <T> byte[] serializer(T value, Schema schema) throws IOException{
+ SpecificDatumWriter writer = (SpecificDatumWriter<?>) writerMap.get(schema.getFullName());
+ if (writer == null) {
+ writer = new SpecificDatumWriter(schema);// ignore dirty bits
+ writerMap.put(schema.getFullName(),writer);
+ }
+
+ BinaryEncoder encoderFromCache = encoders.get();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ outputStream.set(bos);
+ BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(bos, null);
+ if (encoderFromCache == null) {
+ encoders.set(encoder);
+ }
+
+ //reset the buffers
+ ByteArrayOutputStream os = outputStream.get();
+ os.reset();
+
+ writer.write(value, encoder);
+ encoder.flush();
+ byte[] byteValue = os.toByteArray();
+ return byteValue;
+ }
+
+ public static Object deserializer(Object value, Schema schema) throws IOException{
+ String schemaId = schema.getFullName();
+
+ SpecificDatumReader<?> reader = (SpecificDatumReader<?>)readerMap.get(schemaId);
+ if (reader == null) {
+ reader = new SpecificDatumReader(schema);// ignore dirty bits
+ SpecificDatumReader localReader=null;
+ if((localReader=readerMap.putIfAbsent(schemaId, reader))!=null) {
+ reader = localReader;
+ }
+ }
+
+ // initialize a decoder, possibly reusing previous one
+ BinaryDecoder decoderFromCache = decoders.get();
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder((byte[])value, null);
+ // put in threadlocal cache if the initial get was empty
+ if (decoderFromCache==null) {
+ decoders.set(decoder);
+ }
+
+ Object result = reader.read(null, decoder);
+ return result;
+
+ }
+}
Modified: gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
+++ gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Wed Mar 5 23:06:41 2014
@@ -18,10 +18,10 @@
package org.apache.gora.cassandra.store;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -44,7 +44,6 @@ import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData.Array;
import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8;
@@ -63,6 +62,7 @@ import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.query.impl.PartitionQueryImpl;
import org.apache.gora.store.impl.DataStoreBase;
+import org.apache.gora.cassandra.serializers.AvroSerializerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,7 +83,7 @@ public class CassandraStore<K, T extends
* Fixed string with value "UnionIndex" used to generate an extra column based on
* the original field's name
*/
- public static String UNION_COL_SUFIX = "UnionIndex";
+ public static String UNION_COL_SUFIX = "_UnionIndex";
/**
* Default schema index with value "0" used when AVRO Union data types are stored
@@ -99,12 +99,6 @@ public class CassandraStore<K, T extends
*/
private Map<K, T> buffer = Collections.synchronizedMap(new LinkedHashMap<K, T>());
- /**
- * Threadlocals maintaining reusable binary decoders and encoders.
- */
- private static ThreadLocal<ByteArrayOutputStream> outputStream =
- new ThreadLocal<ByteArrayOutputStream>();
-
public static final ThreadLocal<BinaryEncoder> encoders =
new ThreadLocal<BinaryEncoder>();
@@ -317,7 +311,9 @@ public class CassandraStore<K, T extends
query.setDataStore(this);
query.setKeyRange(key, key);
-
+ if (fields == null){
+ fields = this.getFields();
+ }
// Generating UnionFields
ArrayList<String> unionFields = new ArrayList<String>();
for (String field: fields){
@@ -332,13 +328,13 @@ public class CassandraStore<K, T extends
String[] both = (String[]) ArrayUtils.addAll(fields, arr);
query.setFields(both);
+
query.setLimit(1);
Result<K,T> result = execute(query);
boolean hasResult = false;
try {
hasResult = result.next();
} catch (Exception e) {
- // TODO Auto-generated catch block
e.printStackTrace();
}
return hasResult ? result.get() : null;
@@ -347,9 +343,7 @@ public class CassandraStore<K, T extends
@Override
public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
throws IOException {
- // TODO right now this just obtains a single partition
- // we need to obtain the correct splits for partitions in
- // order to achieve data locality.
+ // TODO GORA-298 Implement CassandraStore#getPartitions
List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>();
PartitionQueryImpl<K, T> pqi = new PartitionQueryImpl<K, T>(query);
pqi.setConf(getConf());
@@ -483,11 +477,11 @@ public class CassandraStore<K, T extends
* Add a field to Cassandra according to its type.
* @param key the key of the row where the field should be added
* @param field the Avro field representing a datum
+ * @param schema the schema belonging to the particular Avro field
* @param value the field value
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
private void addOrUpdateField(K key, Field field, Schema schema, Object value) {
- //Schema schema = field.schema();
Type type = schema.getType();
// checking if the value to be updated is used for saving union schema
if (field.name().indexOf(CassandraStore.UNION_COL_SUFIX) < 0){
@@ -505,75 +499,13 @@ public class CassandraStore<K, T extends
case RECORD:
if (value != null) {
if (value instanceof PersistentBase) {
- PersistentBase persistentBase = (PersistentBase) value;
-
- SpecificDatumWriter writer = (SpecificDatumWriter<?>) writerMap.get(schema.getFullName());
- if (writer == null) {
- writer = new SpecificDatumWriter(schema);// ignore dirty bits
- writerMap.put(schema.getFullName(),writer);
- }
-
- BinaryEncoder encoderFromCache = encoders.get();
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- outputStream.set(bos);
- BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(bos, null);
- if (encoderFromCache == null) {
- encoders.set(encoder);
- }
-
- //reset the buffers
- ByteArrayOutputStream os = outputStream.get();
- os.reset();
-
+ PersistentBase persistentBase = (PersistentBase) value;
try {
- writer.write(persistentBase, encoder);
- encoder.flush();
+ byte[] byteValue = AvroSerializerUtil.serializer(persistentBase, schema);
+ this.cassandraClient.addColumn(key, field.name(), byteValue);
} catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.warn(field.name() + " named record could not be serialized.");
}
- byte[] byteValue = os.toByteArray();
-
- //String familyName = this.cassandraClient.getCassandraMapping().getFamily(field.name());
-// if (this.cassandraClient.isSuper( familyName )){
-// this.cassandraClient.addSubColumn(key, columnName, columnName, schemaPos);
-// }else{
-//
-//
-// }
- this.cassandraClient.addColumn(key, field.name(), byteValue);
-
-// for (Field member: schema.getFields()) {
-// if (member.pos() == 0) {
-// continue;
-// }
-// // TODO: hack, do not store empty arrays
-// Object memberValue = persistentBase.get(member.pos());
-// if (memberValue instanceof List<?>) {
-// if (((List<?>)memberValue).size() == 0) {
-// continue;
-// }
-// } else if (memberValue instanceof Map<?,?>) {
-// if (((Map<?, ?>)memberValue).size() == 0) {
-// continue;
-// }
-// }
-// if (memberValue == null){
-// continue;
-// }
-//
-// // Get type for Union Fields
-// Schema memberSchema = member.schema();
-// Type fieldType = memberSchema.getType();
-// if (fieldType.equals(Type.UNION)){
-// int schemaPos = getUnionSchema(memberValue, memberSchema);
-// this.cassandraClient.addSubColumn(key, field.name(),
-// member.name()+UNION_COL_SUFIX, schemaPos);
-// }
-//
-// this.cassandraClient.addSubColumn(key, field.name(),
-// member.name(), memberValue);
-// }
} else {
LOG.warn("Record with value: " + value.toString() + " not supported for field: " + field.name());
}
@@ -581,8 +513,34 @@ public class CassandraStore<K, T extends
break;
case MAP:
if (value != null) {
- if (value instanceof Map<?, ?>) {
- this.cassandraClient.addStatefulHashMap(key, field.name(), (Map<CharSequence,Object>)value);
+ if (value instanceof Map<?, ?>) {
+ Map<CharSequence,Object> map = (Map<CharSequence,Object>)value;
+ Schema valueSchema = schema.getValueType();
+ Type valueType = valueSchema.getType();
+ if (Type.UNION.equals(valueType)){
+ Map<CharSequence,Object> valueMap = new HashMap<CharSequence, Object>();
+ for (CharSequence mapKey: map.keySet()) {
+ Object mapValue = map.get(mapKey);
+ int valueUnionIndex = getUnionSchema(mapValue, valueSchema);
+ valueMap.put((mapKey+UNION_COL_SUFIX), valueUnionIndex);
+ valueMap.put(mapKey, mapValue);
+ }
+ map = valueMap;
+ }
+
+ String familyName = this.cassandraClient.getCassandraMapping().getFamily(field.name());
+
+ // If map is not super column. We using Avro serializer.
+ if (!this.cassandraClient.isSuper( familyName )){
+ try {
+ byte[] byteValue = AvroSerializerUtil.serializer(map, schema);
+ this.cassandraClient.addColumn(key, field.name(), byteValue);
+ } catch (IOException e) {
+ LOG.warn(field.name() + " named map could not be serialized.");
+ }
+ }else{
+ this.cassandraClient.addStatefulHashMap(key, field.name(), map);
+ }
} else {
LOG.warn("Map with value: " + value.toString() + " not supported for field: " + field.name());
}
@@ -611,15 +569,15 @@ public class CassandraStore<K, T extends
String familyName = this.cassandraClient.getCassandraMapping().getFamily(field.name());
this.cassandraClient.getCassandraMapping().addColumn(familyName, columnName, columnName);
if (this.cassandraClient.isSuper( familyName )){
- this.cassandraClient.addSubColumn(key, columnName, columnName, schemaPos);
+ this.cassandraClient.addSubColumn(key, columnName, columnName, schemaPos);
}else{
this.cassandraClient.addColumn(key, columnName, schemaPos);
}
// this.cassandraClient.getCassandraMapping().addColumn(familyName, columnName, columnName);
// adding union value
- Schema unioSchema = schema.getTypes().get(schemaPos);
- addOrUpdateField(key, field, unioSchema, value);
+ Schema unionSchema = schema.getTypes().get(schemaPos);
+ addOrUpdateField(key, field, unionSchema, value);
//this.cassandraClient.addColumn(key, field.name(), value);
} else {
LOG.warn("Union with 'null' value not supported for field: " + field.name());
@@ -661,11 +619,11 @@ public class CassandraStore<K, T extends
else if (pValue instanceof Boolean && schemaType.equals(Type.BOOLEAN))
return unionSchemaPos;
else if (pValue instanceof Map && schemaType.equals(Type.MAP))
- return unionSchemaPos;
+ return unionSchemaPos;
else if (pValue instanceof List && schemaType.equals(Type.ARRAY))
- return unionSchemaPos;
+ return unionSchemaPos;
else if (pValue instanceof Persistent && schemaType.equals(Type.RECORD))
- return unionSchemaPos;
+ return unionSchemaPos;
unionSchemaPos ++;
}
// if we weren't able to determine which data type it is, then we return the default
Modified: gora/branches/GORA_94/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml (original)
+++ gora/branches/GORA_94/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml Wed Mar 5 23:06:41 2014
@@ -47,6 +47,7 @@
<field name="content" family="p" qualifier="p:cnt:c"/>
<field name="parsedContent" family="sc" qualifier="p:parsedContent"/>
<field name="outlinks" family="sc" qualifier="p:outlinks"/>
+ <field name="headers" family="sc" qualifier="p:headers"/>
<field name="metadata" family="p" qualifier="c:mt"/>
</class>
Modified: gora/branches/GORA_94/gora-cassandra/src/test/conf/log4j-server.properties
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/test/conf/log4j-server.properties?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-cassandra/src/test/conf/log4j-server.properties (original)
+++ gora/branches/GORA_94/gora-cassandra/src/test/conf/log4j-server.properties Wed Mar 5 23:06:41 2014
@@ -42,3 +42,6 @@ log4j.appender.R.File=/var/log/cassandra
# Adding this to avoid thrift logging disconnect errors.
log4j.logger.org.apache.thrift.server.TNonblockingServer=ERROR
+# Add for gora-cassandra specific logging duing tests
+log4j.logger.org.apache.gora.cassandra=DEBUG
+
Modified: gora/branches/GORA_94/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java (original)
+++ gora/branches/GORA_94/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java Wed Mar 5 23:06:41 2014
@@ -68,24 +68,27 @@ public class TestCassandraStore extends
}
- @Ignore("skipped until some bugs are fixed")
- @Override
- public void testGetWebPageDefaultFields() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
+ @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
@Override
public void testQuery() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
+ @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
@Override
public void testQueryStartKey() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
+ @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
@Override
public void testQueryEndKey() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
+ @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
@Override
public void testQueryKeyRange() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
+ @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
+ @Override
+ public void testQueryWebPageSingleKey() throws IOException {}
+ @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
@Override
public void testQueryWebPageSingleKeyDefaultFields() throws IOException {}
+ @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
+ @Override
+ public void testQueryWebPageQueryEmptyResults() throws IOException {}
@Ignore("GORA-154 delete() and deleteByQuery() methods are not implemented at CassandraStore, and always returns false or 0")
@Override
public void testDelete() throws IOException {}
@@ -98,17 +101,4 @@ public class TestCassandraStore extends
@Ignore("GORA-298 Implement CassandraStore#getPartitions")
@Override
public void testGetPartitions() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
- @Override
- public void testGetRecursive() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
- @Override
- public void testGetDoubleRecursive() throws IOException{}
- @Ignore("skipped until some bugs are fixed")
- @Override
- public void testGetNested() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
- @Override
- public void testGet3UnionField() throws IOException {}
-
}
Modified: gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java (original)
+++ gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java Wed Mar 5 23:06:41 2014
@@ -39,15 +39,15 @@ public class GoraTestDriver {
protected static final Logger log = LoggerFactory.getLogger(GoraTestDriver.class);
- protected Class<? extends DataStore> dataStoreClass;
+ protected Class<? extends DataStore<?, ?>> dataStoreClass;
protected Configuration conf = new Configuration();
@SuppressWarnings("rawtypes")
protected HashSet<DataStore> dataStores;
- @SuppressWarnings("rawtypes")
+ @SuppressWarnings({ "rawtypes", "unchecked" })
protected GoraTestDriver(Class<? extends DataStore> dataStoreClass) {
- this.dataStoreClass = dataStoreClass;
+ this.dataStoreClass = (Class<? extends DataStore<?, ?>>) dataStoreClass;
this.dataStores = new HashSet<DataStore>();
}
@@ -70,7 +70,7 @@ public class GoraTestDriver {
*/
public void setUp() throws Exception {
log.info("setting up test");
- for(DataStore store : dataStores) {
+ for(DataStore<?, ?> store : dataStores) {
store.truncateSchema();
}
}
Modified: gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java (original)
+++ gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java Wed Mar 5 23:06:41 2014
@@ -587,7 +587,8 @@ public class DataStoreTestUtil {
for (int i = 0; i < urls.length; i++) {
WebPage webPage = WebPage.newBuilder().build();
webPage.setUrl(new Utf8(urls[i]));
- //test put for nullable map field
+ //test put for nullable map field
+ // we put data to the 'headers' field which is a Map with default value of 'null'
webPage.setHeaders(new HashMap<CharSequence, CharSequence>());
for (int j = 0; j < headers.length; j += 2) {
webPage.getHeaders().put(new Utf8(header + j), new Utf8(headers[j]));
@@ -670,7 +671,6 @@ public class DataStoreTestUtil {
String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
"http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g" };
- String anchor = "anchor";
String header = "header";
String[] headers = { "firstHeader", "secondHeader", "thirdHeader",
"fourthHeader", "fifthHeader", "sixthHeader" };
Modified: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java Wed Mar 5 23:06:41 2014
@@ -21,7 +21,6 @@ package org.apache.gora.hbase.query;
import java.io.IOException;
import org.apache.gora.hbase.store.HBaseStore;
-import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.Query;
import org.apache.hadoop.hbase.client.Get;
Modified: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java Wed Mar 5 23:06:41 2014
@@ -23,7 +23,6 @@ import static org.apache.gora.hbase.util
import java.io.IOException;
import org.apache.gora.hbase.store.HBaseStore;
-import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.Query;
import org.apache.gora.query.impl.ResultBase;
Modified: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java Wed Mar 5 23:06:41 2014
@@ -36,7 +36,6 @@ import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.util.Utf8;
-
import org.apache.gora.hbase.query.HBaseGetResult;
import org.apache.gora.hbase.query.HBaseQuery;
import org.apache.gora.hbase.query.HBaseScannerResult;
@@ -51,7 +50,6 @@ import org.apache.gora.query.Query;
import org.apache.gora.query.impl.PartitionQueryImpl;
import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.store.impl.DataStoreBase;
-
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -66,11 +64,9 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
-
import org.jdom.Document;
import org.jdom.Element;
import org.jdom.input.SAXBuilder;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -235,7 +231,6 @@ implements Configurable {
* @param persistent
* Record to be persisted in HBase
*/
- @SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void put(K key, T persistent) {
try {
@@ -297,8 +292,9 @@ implements Configurable {
}
break;
case MAP:
+ @SuppressWarnings({ "rawtypes", "unchecked" })
Set<Entry> set = ((Map) o).entrySet();
- for (Entry entry : set) {
+ for (@SuppressWarnings("rawtypes") Entry entry : set) {
byte[] qual = toBytes(entry.getKey());
addPutsAndDeletes(put, delete, entry.getValue(), schema.getValueType()
.getType(), schema.getValueType(), hcol, qual);
@@ -597,7 +593,6 @@ implements Configurable {
}
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
/**
* Creates a new Persistent instance with the values in 'result' for the fields listed.
* @param result result form a HTable#get()
@@ -649,7 +644,7 @@ implements Configurable {
return;
}
Schema valueSchema = fieldSchema.getValueType();
- Map map = new HashMap();
+ Map<Utf8, Object> map = new HashMap<Utf8, Object>();
for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
map.put(new Utf8(Bytes.toString(e.getKey())),
fromBytes(valueSchema, e.getValue()));
@@ -662,8 +657,8 @@ implements Configurable {
return;
}
valueSchema = fieldSchema.getElementType();
- ArrayList arrayList = new ArrayList();
- DirtyListWrapper dirtyListWrapper = new DirtyListWrapper(arrayList);
+ ArrayList<Object> arrayList = new ArrayList<Object>();
+ DirtyListWrapper<Object> dirtyListWrapper = new DirtyListWrapper<Object>(arrayList);
for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
dirtyListWrapper.add(fromBytes(valueSchema, e.getValue()));
}