You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by rm...@apache.org on 2014/04/12 21:21:56 UTC
svn commit: r1586888 [3/10] - in /gora/trunk: ./ bin/ gora-accumulo/
gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/
gora-accumulo/src/main/java/org/apache/gora/accumulo/query/
gora-accumulo/src/main/java/org/apache/gora/accumulo/store/ ...
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Sat Apr 12 19:21:53 2014
@@ -21,6 +21,7 @@ package org.apache.gora.cassandra.store;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -28,6 +29,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.HColumn;
@@ -40,30 +42,34 @@ import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData.Array;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.gora.cassandra.query.CassandraQuery;
import org.apache.gora.cassandra.query.CassandraResult;
import org.apache.gora.cassandra.query.CassandraResultSet;
import org.apache.gora.cassandra.query.CassandraRow;
import org.apache.gora.cassandra.query.CassandraSubColumn;
import org.apache.gora.cassandra.query.CassandraSuperColumn;
-import org.apache.gora.persistency.ListGenericArray;
-import org.apache.gora.persistency.StatefulHashMap;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
import org.apache.gora.persistency.impl.PersistentBase;
-import org.apache.gora.persistency.impl.StateManagerImpl;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.query.impl.PartitionQueryImpl;
-import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.store.impl.DataStoreBase;
+import org.apache.gora.cassandra.serializers.AvroSerializerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link org.apache.gora.cassandra.store.CassandraStore} is the primary class
* responsible for directing Gora CRUD operations into Cassandra. We (delegate) rely
- * heavily on {@ link org.apache.gora.cassandra.store.CassandraClient} for many operations
+ * heavily on {@link org.apache.gora.cassandra.store.CassandraClient} for many operations
* such as initialization, creating and deleting schemas (Cassandra Keyspaces), etc.
*/
public class CassandraStore<K, T extends PersistentBase> extends DataStoreBase<K, T> {
@@ -71,38 +77,45 @@ public class CassandraStore<K, T extends
/** Logging implementation */
public static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
- /** Consistency property level for Cassandra column families */
- private static final String COL_FAM_CL = "cf.consistency.level";
+ private CassandraClient<K, T> cassandraClient = new CassandraClient<K, T>();
- /** Consistency property level for Cassandra read operations. */
- private static final String READ_OP_CL = "read.consistency.level";
-
- /** Consistency property level for Cassandra write operations. */
- private static final String WRITE_OP_CL = "write.consistency.level";
-
- /** Variables to hold different consistency levels defined by the properties. */
- public static String colFamConsLvl;
- public static String readOpConsLvl;
- public static String writeOpConsLvl;
-
- private CassandraClient<K, T> cassandraClient = new CassandraClient<K, T>();
+ /**
+ * Fixed string with value "UnionIndex" used to generate an extra column based on
+ * the original field's name
+ */
+ public static String UNION_COL_SUFIX = "_UnionIndex";
/**
- * Default schema index used when AVRO Union data types are stored
+ * Default schema index with value "0" used when AVRO Union data types are stored
*/
public static int DEFAULT_UNION_SCHEMA = 0;
-
+
/**
* The values are Avro fields pending to be stored.
*
* We want to iterate over the keys in insertion order.
- * We don't want to lock the entire collection before iterating over the keys, since in the meantime other threads are adding entries to the map.
+ * We don't want to lock the entire collection before iterating over the keys,
+ * since in the meantime other threads are adding entries to the map.
*/
private Map<K, T> buffer = Collections.synchronizedMap(new LinkedHashMap<K, T>());
+ public static final ThreadLocal<BinaryEncoder> encoders =
+ new ThreadLocal<BinaryEncoder>();
+
+ /**
+ * Create a {@link java.util.concurrent.ConcurrentHashMap} for the
+ * datum readers and writers.
+ * This is necessary because they are not thread safe, at least not before
+ * Avro 1.4.0 (See AVRO-650).
+ * When they are thread safe, it is possible to maintain a single reader and
+ * writer pair for every schema, instead of one for every thread.
+ * @see <a href="https://issues.apache.org/jira/browse/AVRO-650">AVRO-650</a>
+ */
+ public static final ConcurrentHashMap<String, SpecificDatumWriter<?>> writerMap =
+ new ConcurrentHashMap<String, SpecificDatumWriter<?>>();
+
/** The default constructor for CassandraStore */
public CassandraStore() throws Exception {
- // this.cassandraClient.initialize();
}
/**
@@ -114,14 +127,6 @@ public class CassandraStore<K, T extends
public void initialize(Class<K> keyClass, Class<T> persistent, Properties properties) {
try {
super.initialize(keyClass, persistent, properties);
- if (autoCreateSchema) {
- // If this is not set, then each Cassandra client should set its default
- // column family
- colFamConsLvl = DataStoreFactory.findProperty(properties, this, COL_FAM_CL, null);
- // operations
- readOpConsLvl = DataStoreFactory.findProperty(properties, this, READ_OP_CL, null);
- writeOpConsLvl = DataStoreFactory.findProperty(properties, this, WRITE_OP_CL, null);
- }
this.cassandraClient.initialize(keyClass, persistent);
} catch (Exception e) {
LOG.error(e.getMessage());
@@ -143,7 +148,6 @@ public class CassandraStore<K, T extends
@Override
public boolean delete(K key) {
- LOG.debug("delete " + key);
this.cassandraClient.deleteByKey(key);
return true;
}
@@ -162,7 +166,7 @@ public class CassandraStore<K, T extends
/**
* When executing Gora Queries in Cassandra we query the Cassandra keyspace by families.
- * When add sub/supercolumns, Gora keys are mapped to Cassandra partition keys only.
+ * When we add sub/supercolumns, Gora keys are mapped to Cassandra partition keys only.
* This is because we follow the Cassandra logic where column family data is
* partitioned across nodes based on row Key.
*/
@@ -238,14 +242,14 @@ public class CassandraStore<K, T extends
* partitioned across nodes based on row Key.
*/
private void addSuperColumns(String family, CassandraQuery<K, T> cassandraQuery,
- CassandraResultSet cassandraResultSet) {
+ CassandraResultSet<K> cassandraResultSet) {
List<SuperRow<K, String, ByteBuffer, ByteBuffer>> superRows = this.cassandraClient.executeSuper(cassandraQuery, family);
for (SuperRow<K, String, ByteBuffer, ByteBuffer> superRow: superRows) {
K key = superRow.getKey();
CassandraRow<K> cassandraRow = cassandraResultSet.getRow(key);
if (cassandraRow == null) {
- cassandraRow = new CassandraRow();
+ cassandraRow = new CassandraRow<K>();
cassandraResultSet.putRow(key, cassandraRow);
cassandraRow.setKey(key);
}
@@ -261,7 +265,11 @@ public class CassandraStore<K, T extends
}
/**
- * Flush the buffer. Write the buffered rows.
+ * Flush the buffer which is a synchronized {@link java.util.LinkedHashMap}
+ * storing fields pending to be stored by
+ * {@link org.apache.gora.cassandra.store.CassandraStore#put(Object, PersistentBase)}
+ * operations. Invoking this method therefore writes the buffered rows
+ * into Cassandra.
* @see org.apache.gora.store.DataStore#flush()
*/
@Override
@@ -270,24 +278,28 @@ public class CassandraStore<K, T extends
Set<K> keys = this.buffer.keySet();
// this duplicates memory footprint
+ @SuppressWarnings("unchecked")
K[] keyArray = (K[]) keys.toArray();
- // iterating over the key set directly would throw ConcurrentModificationException with java.util.HashMap and subclasses
+ // iterating over the key set directly would throw
+ //ConcurrentModificationException with java.util.HashMap and subclasses
for (K key: keyArray) {
T value = this.buffer.get(key);
if (value == null) {
- LOG.info("Value to update is null for key " + key);
+ LOG.info("Value to update is null for key: " + key);
continue;
}
Schema schema = value.getSchema();
+
for (Field field: schema.getFields()) {
if (value.isDirty(field.pos())) {
- addOrUpdateField(key, field, value.get(field.pos()));
+ addOrUpdateField(key, field, field.schema(), value.get(field.pos()));
}
}
}
- // remove flushed rows
+ // remove flushed rows from the buffer as all
+ // added or updated fields should now have been written.
for (K key: keyArray) {
this.buffer.remove(key);
}
@@ -298,14 +310,31 @@ public class CassandraStore<K, T extends
CassandraQuery<K,T> query = new CassandraQuery<K,T>();
query.setDataStore(this);
query.setKeyRange(key, key);
- query.setFields(fields);
+
+ if (fields == null){
+ fields = this.getFields();
+ }
+ // Generating UnionFields
+ ArrayList<String> unionFields = new ArrayList<String>();
+ for (String field: fields){
+ Field schemaField =this.fieldMap.get(field);
+ Type type = schemaField.schema().getType();
+ if (type.getName().equals("UNION".toLowerCase())){
+ unionFields.add(field+UNION_COL_SUFIX);
+ }
+ }
+
+ String[] arr = unionFields.toArray(new String[unionFields.size()]);
+ String[] both = (String[]) ArrayUtils.addAll(fields, arr);
+
+ query.setFields(both);
+
query.setLimit(1);
Result<K,T> result = execute(query);
boolean hasResult = false;
try {
hasResult = result.next();
} catch (Exception e) {
- // TODO Auto-generated catch block
e.printStackTrace();
}
return hasResult ? result.get() : null;
@@ -314,7 +343,7 @@ public class CassandraStore<K, T extends
@Override
public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
throws IOException {
- // just a single partition
+ // TODO GORA-298 Implement CassandraStore#getPartitions
List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>();
PartitionQueryImpl<K, T> pqi = new PartitionQueryImpl<K, T>(query);
pqi.setConf(getConf());
@@ -339,168 +368,287 @@ public class CassandraStore<K, T extends
}
/**
- * Duplicate instance to keep all the objects in memory till flushing.
- * @see org.apache.gora.store.DataStore#put(java.lang.Object, org.apache.gora.persistency.Persistent)
+ *
+ * When doing the
+ * {@link org.apache.gora.cassandra.store.CassandraStore#put(Object, PersistentBase)}
+ * operation, the logic is as follows:
+ * <ol>
+ * <li>Obtain the Avro {@link org.apache.avro.Schema} for the object.</li>
+ * <li>Create a new duplicate instance of the object (explained in more detail below) **.</li>
+ * <li>Obtain a {@link java.util.List} of the {@link org.apache.avro.Schema}
+ * {@link org.apache.avro.Schema.Field}'s.</li>
+ * <li>Iterate through the field {@link java.util.List}. This allows us to
+ * consequently process each item.</li>
+ * <li>Check to see if the {@link org.apache.avro.Schema.Field} is NOT dirty.
+ * If this condition is true then we DO NOT process this field.</li>
+ * <li>Obtain the element at the specified position in this list so we can
+ * directly operate on it.</li>
+ * <li>Obtain the {@link org.apache.avro.Schema.Type} of the element obtained
+ * above and process it accordingly. N.B. For nested type ARRAY, MAP
+ * RECORD or UNION, we shadow the checks in bullet point 5 above to infer that the
+ * {@link org.apache.avro.Schema.Field} is either at
+ * position 0 OR it is NOT dirty. If one of these conditions is true then we DO NOT
+ * process this field. This is carried out in
+ * {@link org.apache.gora.cassandra.store.CassandraStore#getFieldValue(Schema, Type, Object)}</li>
+ * <li>We then insert the Key and Object into the {@link java.util.LinkedHashMap} buffer
+ * before being flushed. This performs a structural modification of the map.</li>
+ * </ol>
+ * ** We create a duplicate instance of the object to be persisted and insert processed
+ * objects into a synchronized {@link java.util.LinkedHashMap}. This allows
+ * us to keep all the objects in memory till flushing.
+ * @see org.apache.gora.store.DataStore#put(java.lang.Object,
+ * org.apache.gora.persistency.Persistent).
+ * @param key for the Avro Record (object).
+ * @param value Record object to be persisted in Cassandra
*/
@Override
public void put(K key, T value) {
- T p = (T) value.newInstance(new StateManagerImpl());
Schema schema = value.getSchema();
- for (Field field: schema.getFields()) {
- int fieldPos = field.pos();
- if (value.isDirty(fieldPos)) {
- Object fieldValue = value.get(fieldPos);
-
- // check if field has a nested structure (array, map, or record)
- Schema fieldSchema = field.schema();
- Type type = fieldSchema.getType();
- switch(type) {
- case RECORD:
- PersistentBase persistent = (PersistentBase) fieldValue;
- PersistentBase newRecord = (PersistentBase) persistent.newInstance(new StateManagerImpl());
- for (Field member: fieldSchema.getFields()) {
- newRecord.put(member.pos(), persistent.get(member.pos()));
- }
- fieldValue = newRecord;
- break;
- case MAP:
- StatefulHashMap map = (StatefulHashMap) fieldValue;
- StatefulHashMap newMap = new StatefulHashMap();
- for (Object mapKey : map.keySet()) {
- newMap.put(mapKey, map.get(mapKey));
- newMap.putState(mapKey, map.getState(mapKey));
- }
- fieldValue = newMap;
- break;
- case ARRAY:
- GenericArray array = (GenericArray) fieldValue;
- ListGenericArray newArray = new ListGenericArray(fieldSchema.getElementType());
- Iterator iter = array.iterator();
- while (iter.hasNext()) {
- newArray.add(iter.next());
- }
- fieldValue = newArray;
- break;
- case UNION:
- // storing the union selected schema, the actual value will be stored as soon as getting out of here
- // TODO determine which schema we are using: int schemaPos = getUnionSchema(fieldValue,fieldSchema);
- // and save it p.put( p.getFieldIndex(field.name() + CassandraStore.UNION_COL_SUFIX), schemaPos);
- break;
- }
-
- p.put(fieldPos, fieldValue);
+ @SuppressWarnings("unchecked")
+ T p = (T) SpecificData.get().newRecord(value, schema);
+ List<Field> fields = schema.getFields();
+ for (int i = 1; i < fields.size(); i++) {
+ if (!value.isDirty(i)) {
+ continue;
}
+ Field field = fields.get(i);
+ Type type = field.schema().getType();
+ Object fieldValue = value.get(field.pos());
+ Schema fieldSchema = field.schema();
+ // check if field has a nested structure (array, map, record or union)
+ fieldValue = getFieldValue(fieldSchema, type, fieldValue);
+ p.put(field.pos(), fieldValue);
}
-
// this performs a structural modification of the map
this.buffer.put(key, p);
}
/**
+ * For every field within an object, we pass in a field schema, Type and value.
+ * This enables us to process fields (based on their characteristics)
+ * preparing them for persistence.
+ * @param fieldSchema the associated field schema
+ * @param type the field type
+ * @param fieldValue the field value.
+ * @return
+ */
+ private Object getFieldValue(Schema fieldSchema, Type type, Object fieldValue ){
+ switch(type) {
+ case RECORD:
+ Persistent persistent = (Persistent) fieldValue;
+ Persistent newRecord = (Persistent) SpecificData.get().newRecord(persistent, persistent.getSchema());
+ for (Field member: fieldSchema.getFields()) {
+ if (member.pos() == 0 || !persistent.isDirty()) {
+ continue;
+ }
+ Schema memberSchema = member.schema();
+ Type memberType = memberSchema.getType();
+ Object memberValue = persistent.get(member.pos());
+ newRecord.put(member.pos(), getFieldValue(memberSchema, memberType, memberValue));
+ }
+ fieldValue = newRecord;
+ break;
+ case MAP:
+ Map<?, ?> map = (Map<?, ?>) fieldValue;
+ fieldValue = map;
+ break;
+ case ARRAY:
+ fieldValue = (List<?>) fieldValue;
+ break;
+ case UNION:
+ // storing the union selected schema, the actual value will
+ // be stored as soon as we get break out.
+ if (fieldValue != null){
+ int schemaPos = getUnionSchema(fieldValue,fieldSchema);
+ Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
+ Type unionType = unionSchema.getType();
+ fieldValue = getFieldValue(unionSchema, unionType, fieldValue);
+ }
+ //p.put( schemaPos, p.getSchema().getField(field.name() + CassandraStore.UNION_COL_SUFIX));
+ //p.put(fieldPos, fieldValue);
+ break;
+ default:
+ break;
+ }
+ return fieldValue;
+ }
+
+ /**
* Add a field to Cassandra according to its type.
* @param key the key of the row where the field should be added
* @param field the Avro field representing a datum
+ * @param schema the schema belonging to the particular Avro field
* @param value the field value
*/
- private void addOrUpdateField(K key, Field field, Object value) {
- Schema schema = field.schema();
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private void addOrUpdateField(K key, Field field, Schema schema, Object value) {
Type type = schema.getType();
- switch (type) {
- case STRING:
- case BOOLEAN:
- case INT:
- case LONG:
- case BYTES:
- case FLOAT:
- case DOUBLE:
- case FIXED:
- this.cassandraClient.addColumn(key, field.name(), value);
- break;
- case RECORD:
- if (value != null) {
- if (value instanceof PersistentBase) {
- PersistentBase persistentBase = (PersistentBase) value;
- for (Field member: schema.getFields()) {
-
- // TODO: hack, do not store empty arrays
- Object memberValue = persistentBase.get(member.pos());
- if (memberValue instanceof GenericArray<?>) {
- if (((GenericArray)memberValue).size() == 0) {
- continue;
+ // checking if the value to be updated is used for saving union schema
+ if (field.name().indexOf(CassandraStore.UNION_COL_SUFIX) < 0){
+ switch (type) {
+ case STRING:
+ case BOOLEAN:
+ case INT:
+ case LONG:
+ case BYTES:
+ case FLOAT:
+ case DOUBLE:
+ case FIXED:
+ this.cassandraClient.addColumn(key, field.name(), value);
+ break;
+ case RECORD:
+ if (value != null) {
+ if (value instanceof PersistentBase) {
+ PersistentBase persistentBase = (PersistentBase) value;
+ try {
+ byte[] byteValue = AvroSerializerUtil.serializer(persistentBase, schema);
+ this.cassandraClient.addColumn(key, field.name(), byteValue);
+ } catch (IOException e) {
+ LOG.warn(field.name() + " named record could not be serialized.");
+ }
+ } else {
+ LOG.warn("Record with value: " + value.toString() + " not supported for field: " + field.name());
+ }
+ } else {
+ LOG.warn("Setting content of: " + field.name() + " to null.");
+ String familyName = this.cassandraClient.getCassandraMapping().getFamily(field.name());
+ this.cassandraClient.deleteColumn(key, familyName, this.cassandraClient.toByteBuffer(field.name()));
+ }
+ break;
+ case MAP:
+ if (value != null) {
+ if (value instanceof Map<?, ?>) {
+ Map<CharSequence,Object> map = (Map<CharSequence,Object>)value;
+ Schema valueSchema = schema.getValueType();
+ Type valueType = valueSchema.getType();
+ if (Type.UNION.equals(valueType)){
+ Map<CharSequence,Object> valueMap = new HashMap<CharSequence, Object>();
+ for (CharSequence mapKey: map.keySet()) {
+ Object mapValue = map.get(mapKey);
+ int valueUnionIndex = getUnionSchema(mapValue, valueSchema);
+ valueMap.put((mapKey+UNION_COL_SUFIX), valueUnionIndex);
+ valueMap.put(mapKey, mapValue);
}
+ map = valueMap;
}
- this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
+
+ String familyName = this.cassandraClient.getCassandraMapping().getFamily(field.name());
+
+ // If map is not super column. We using Avro serializer.
+ if (!this.cassandraClient.isSuper( familyName )){
+ try {
+ byte[] byteValue = AvroSerializerUtil.serializer(map, schema);
+ this.cassandraClient.addColumn(key, field.name(), byteValue);
+ } catch (IOException e) {
+ LOG.warn(field.name() + " named map could not be serialized.");
+ }
+ }else{
+ this.cassandraClient.addStatefulHashMap(key, field.name(), map);
+ }
+ } else {
+ LOG.warn("Map with value: " + value.toString() + " not supported for field: " + field.name());
}
} else {
- LOG.warn("Record with value: " + value.toString() + " not supported for field: " + field.name());
+ // delete map
+ LOG.warn("Setting content of: " + field.name() + " to null.");
+ this.cassandraClient.deleteStatefulHashMap(key, field.name());
}
- }
- break;
- case MAP:
- if (value != null) {
- if (value instanceof StatefulHashMap<?, ?>) {
- this.cassandraClient.addStatefulHashMap(key, field.name(), (StatefulHashMap<Utf8,Object>)value);
+ break;
+ case ARRAY:
+ if (value != null) {
+ if (value instanceof DirtyListWrapper<?>) {
+ DirtyListWrapper fieldValue = (DirtyListWrapper<?>)value;
+ GenericArray valueArray = new Array(fieldValue.size(), schema);
+ for (int i = 0; i < fieldValue.size(); i++) {
+ valueArray.add(i, fieldValue.get(i));
+ }
+ this.cassandraClient.addGenericArray(key, field.name(), (GenericArray<?>)valueArray);
+ } else {
+ LOG.warn("Array with value: " + value.toString() + " not supported for field: " + field.name());
+ }
} else {
- LOG.warn("Map with value: " + value.toString() + " not supported for field: " + field.name());
+ LOG.warn("Setting content of: " + field.name() + " to null.");
+ this.cassandraClient.deleteGenericArray(key, field.name());
}
- }
- break;
- case ARRAY:
- if (value != null) {
- if (value instanceof GenericArray<?>) {
- this.cassandraClient.addGenericArray(key, field.name(), (GenericArray)value);
+ break;
+ case UNION:
+ // adding union schema index
+ String columnName = field.name() + UNION_COL_SUFIX;
+ String familyName = this.cassandraClient.getCassandraMapping().getFamily(field.name());
+ if(value != null) {
+ int schemaPos = getUnionSchema(value, schema);
+ LOG.debug("Union with value: " + value.toString() + " at index: " + schemaPos + " supported for field: " + field.name());
+ this.cassandraClient.getCassandraMapping().addColumn(familyName, columnName, columnName);
+ if (this.cassandraClient.isSuper( familyName )){
+ this.cassandraClient.addSubColumn(key, columnName, columnName, schemaPos);
+ }else{
+ this.cassandraClient.addColumn(key, columnName, schemaPos);
+
+ }
+ //this.cassandraClient.getCassandraMapping().addColumn(familyName, columnName, columnName);
+ // adding union value
+ Schema unionSchema = schema.getTypes().get(schemaPos);
+ addOrUpdateField(key, field, unionSchema, value);
+ //this.cassandraClient.addColumn(key, field.name(), value);
} else {
- LOG.warn("Array with value: " + value.toString() + " not supported for field: " + field.name());
+ LOG.warn("Setting content of: " + field.name() + " to null.");
+ if (this.cassandraClient.isSuper( familyName )){
+ this.cassandraClient.deleteSubColumn(key, field.name());
+ } else {
+ this.cassandraClient.deleteColumn(key, familyName, this.cassandraClient.toByteBuffer(field.name()));
+ }
}
+ break;
+ default:
+ LOG.warn("Type: " + type.name() + " not considered for field: " + field.name() + ". Please report this to dev@gora.apache.org");
}
- break;
- case UNION:
- if(value != null) {
- LOG.debug("Union with value: " + value.toString() + " at index: " + getUnionSchema(value, schema) + " supported for field: " + field.name());
- this.cassandraClient.addColumn(key, field.name(), value);
- } else {
- LOG.warn("Union with 'null' value not supported for field: " + field.name());
- }
- default:
- LOG.warn("Type: " + type.name() + " not considered for field: " + field.name() + ". Please report this to dev@gora.apache.org");
}
}
/**
- * Gets the position within the schema of the type used
+ * Given an object and the object schema this function obtains,
+ * from within the UNION schema, the position of the type used.
+ * If no data type can be inferred then we return a default value
+ * of position 0.
* @param pValue
* @param pUnionSchema
- * @return
+ * @return the unionSchemaPosition.
*/
private int getUnionSchema(Object pValue, Schema pUnionSchema){
int unionSchemaPos = 0;
- String valueType = pValue.getClass().getSimpleName();
+// String valueType = pValue.getClass().getSimpleName();
Iterator<Schema> it = pUnionSchema.getTypes().iterator();
while ( it.hasNext() ){
- String schemaName = it.next().getName();
- if (valueType.equals("Utf8") && schemaName.equals(Type.STRING.name().toLowerCase()))
+ Type schemaType = it.next().getType();
+ if (pValue instanceof Utf8 && schemaType.equals(Type.STRING))
return unionSchemaPos;
- else if (valueType.equals("HeapByteBuffer") && schemaName.equals(Type.STRING.name().toLowerCase()))
+ else if (pValue instanceof ByteBuffer && schemaType.equals(Type.BYTES))
return unionSchemaPos;
- else if (valueType.equals("Integer") && schemaName.equals(Type.INT.name().toLowerCase()))
+ else if (pValue instanceof Integer && schemaType.equals(Type.INT))
return unionSchemaPos;
- else if (valueType.equals("Long") && schemaName.equals(Type.LONG.name().toLowerCase()))
+ else if (pValue instanceof Long && schemaType.equals(Type.LONG))
return unionSchemaPos;
- else if (valueType.equals("Double") && schemaName.equals(Type.DOUBLE.name().toLowerCase()))
+ else if (pValue instanceof Double && schemaType.equals(Type.DOUBLE))
return unionSchemaPos;
- else if (valueType.equals("Float") && schemaName.equals(Type.FLOAT.name().toLowerCase()))
+ else if (pValue instanceof Float && schemaType.equals(Type.FLOAT))
return unionSchemaPos;
- else if (valueType.equals("Boolean") && schemaName.equals(Type.BOOLEAN.name().toLowerCase()))
+ else if (pValue instanceof Boolean && schemaType.equals(Type.BOOLEAN))
+ return unionSchemaPos;
+ else if (pValue instanceof Map && schemaType.equals(Type.MAP))
+ return unionSchemaPos;
+ else if (pValue instanceof List && schemaType.equals(Type.ARRAY))
+ return unionSchemaPos;
+ else if (pValue instanceof Persistent && schemaType.equals(Type.RECORD))
return unionSchemaPos;
unionSchemaPos ++;
}
// if we weren't able to determine which data type it is, then we return the default
- return 0;
+ return DEFAULT_UNION_SCHEMA;
}
/**
- * Checks to see if a Cassandra Keyspace actually exists.
- * Returns true if it does.
+ * Simple method to check if a Cassandra Keyspace exists.
+ * @return true if a Keyspace exists.
*/
@Override
public boolean schemaExists() {
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java Sat Apr 12 19:21:53 2014
@@ -27,7 +27,6 @@ import me.prettyprint.cassandra.serializ
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.HSuperColumn;
import me.prettyprint.hector.api.factory.HFactory;
-import me.prettyprint.hector.api.mutation.MutationResult;
import me.prettyprint.hector.api.mutation.Mutator;
import org.apache.gora.persistency.Persistent;
@@ -39,7 +38,6 @@ import org.apache.gora.persistency.Persi
*/
public class HectorUtils<K,T extends Persistent> {
- /** Methods to insert columns. */
public static<K> void insertColumn(Mutator<K> mutator, K key, String columnFamily, ByteBuffer columnName, ByteBuffer columnValue, String ttlAttr) {
mutator.insert(key, columnFamily, createColumn(columnName, columnValue, ttlAttr));
}
@@ -48,7 +46,7 @@ public class HectorUtils<K,T extends Per
mutator.insert(key, columnFamily, createColumn(columnName, columnValue, ttlAttr));
}
- /** Methods to create columns. */
+
public static<K> HColumn<ByteBuffer,ByteBuffer> createColumn(ByteBuffer name, ByteBuffer value, String ttlAttr) {
return HFactory.createColumn(name, value, ByteBufferSerializer.get(), ByteBufferSerializer.get()).setTtl(Integer.parseInt(ttlAttr));
}
@@ -61,7 +59,7 @@ public class HectorUtils<K,T extends Per
return HFactory.createColumn(name, value, IntegerSerializer.get(), ByteBufferSerializer.get()).setTtl(Integer.parseInt(ttlAttr));
}
- /** Methods to create subColumns. */
+
public static<K> void insertSubColumn(Mutator<K> mutator, K key, String columnFamily, String superColumnName, ByteBuffer columnName, ByteBuffer columnValue, String ttlAttr) {
mutator.insert(key, columnFamily, createSuperColumn(superColumnName, columnName, columnValue, ttlAttr));
}
@@ -74,30 +72,25 @@ public class HectorUtils<K,T extends Per
mutator.insert(key, columnFamily, createSuperColumn(superColumnName, columnName, columnValue, ttlAttr));
}
- /** Methods to delete subColumns. */
+
public static<K> void deleteSubColumn(Mutator<K> mutator, K key, String columnFamily, String superColumnName, ByteBuffer columnName) {
mutator.subDelete(key, columnFamily, superColumnName, columnName, StringSerializer.get(), ByteBufferSerializer.get());
}
- /** Methods do delete columns. */
public static<K> void deleteColumn(Mutator<K> mutator, K key, String columnFamily, ByteBuffer columnName){
- MutationResult mr = mutator.delete(key, columnFamily, columnName, ByteBufferSerializer.get());
- System.out.println(mr.toString());
+ mutator.delete(key, columnFamily, columnName, ByteBufferSerializer.get());
}
- /** Methods to create superColumns. */
- @SuppressWarnings("unchecked")
public static<K> HSuperColumn<String,ByteBuffer,ByteBuffer> createSuperColumn(String superColumnName, ByteBuffer columnName, ByteBuffer columnValue, String ttlAttr) {
return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue, ttlAttr)), StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get());
}
- @SuppressWarnings("unchecked")
public static<K> HSuperColumn<String,String,ByteBuffer> createSuperColumn(String superColumnName, String columnName, ByteBuffer columnValue, String ttlAttr) {
return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue, ttlAttr)), StringSerializer.get(), StringSerializer.get(), ByteBufferSerializer.get());
}
- @SuppressWarnings("unchecked")
public static<K> HSuperColumn<String,Integer,ByteBuffer> createSuperColumn(String superColumnName, Integer columnName, ByteBuffer columnValue, String ttlAttr) {
return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue, ttlAttr)), StringSerializer.get(), IntegerSerializer.get(), ByteBufferSerializer.get());
}
+
}
Modified: gora/trunk/gora-cassandra/src/test/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/test/conf/cassandra.yaml?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/test/conf/cassandra.yaml (original)
+++ gora/trunk/gora-cassandra/src/test/conf/cassandra.yaml Sat Apr 12 19:21:53 2014
@@ -7,7 +7,7 @@
# The name of the cluster. This is mainly used to prevent machines in
# one logical cluster from joining another.
-cluster_name: 'Gora Cassandra Test Cluster'
+cluster_name: "Gora Cassandra Test Cluster"
# You should always specify InitialToken when setting up a production
# cluster for the first time, and often when adding capacity later.
@@ -27,13 +27,13 @@ hinted_handoff_enabled: true
# generated. After it has been dead this long, hints will be dropped.
max_hint_window_in_ms: 3600000 # one hour
# Sleep this long after delivering each row or row fragment
-hinted_handoff_throttle_delay_in_ms: 50
+#hinted_handoff_throttle_delay_in_ms: 50 (deprecated in 2.0.2)
# authentication backend, implementing IAuthenticator; used to identify users
authenticator: org.apache.cassandra.auth.AllowAllAuthenticator
# authorization backend, implementing IAuthority; used to limit access/provide permissions
-authority: org.apache.cassandra.auth.AllowAllAuthority
+authorizer: org.apache.cassandra.auth.AllowAllAuthorizer
# The partitioner is responsible for distributing rows (by key) across
# nodes in the cluster. Any IPartitioner may be used, including your
@@ -109,7 +109,7 @@ seed_provider:
# it is most effective under light to moderate load, or read-heavy
# workloads; under truly massive write load, it will often be too
# little, too late.
-flush_largest_memtables_at: 0.75
+#flush_largest_memtables_at: 0.75 (deprecated in 2.0.2)
# emergency pressure valve #2: the first time heap usage after a full
# (CMS) garbage collection is above this fraction of the max,
@@ -120,8 +120,8 @@ flush_largest_memtables_at: 0.75
#
# Set to 1.0 to disable. Setting this lower than
# CMSInitiatingOccupancyFraction is not likely to be useful.
-reduce_cache_sizes_at: 0.85
-reduce_cache_capacity_to: 0.6
+#reduce_cache_sizes_at: 0.85 (deprecated in 2.0.2)
+#reduce_cache_capacity_to: 0.6 (deprecated in 2.0.2)
# For workloads with more data than can fit in memory, Cassandra's
# bottleneck will be reads that need to fetch data from
@@ -311,7 +311,7 @@ compaction_preheat_key_cache: true
# stream_throughput_outbound_megabits_per_sec: 400
# Time to wait for a reply from other nodes before failing the command
-rpc_timeout_in_ms: 10000
+#rpc_timeout_in_ms: 10000 (deprecated in 2.0.2)
# phi value that must be reached for a host to be marked down.
# most users should never need to adjust this.
@@ -410,7 +410,7 @@ index_interval: 128
# The passwords used in these options must match the passwords used when generating
# the keystore and truststore. For instructions on generating these files, see:
# http://download.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html#CreateKeystore
-encryption_options:
+server_encryption_options:
internode_encryption: none
keystore: conf/.keystore
keystore_password: cassandra
Modified: gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml (original)
+++ gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml Sat Apr 12 19:21:53 2014
@@ -47,7 +47,8 @@
<field name="content" family="p" qualifier="p:cnt:c" ttl="10"/>
<field name="parsedContent" family="sc" qualifier="p:parsedContent" ttl="10"/>
<field name="outlinks" family="sc" qualifier="p:outlinks" ttl="10"/>
- <field name="metadata" family="sc" qualifier="c:mt" ttl="10"/>
+ <field name="headers" family="sc" qualifier="p:headers" ttl="10"/>
+ <field name="metadata" family="p" qualifier="c:mt" ttl="10"/>
</class>
<class name="org.apache.gora.examples.generated.TokenDatum" keyClass="java.lang.String" keyspace="TokenDatum">
Modified: gora/trunk/gora-cassandra/src/test/conf/gora.properties
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/test/conf/gora.properties?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/test/conf/gora.properties (original)
+++ gora/trunk/gora-cassandra/src/test/conf/gora.properties Sat Apr 12 19:21:53 2014
@@ -14,10 +14,16 @@
# limitations under the License.
gora.datastore.default=org.apache.gora.cassandra.CassandraStore
-gora.cassandrastore.cluster=Test Cluster
-gora.cassandrastore.host=localhost
-# property is annotated in CassandraClient#checkKeyspace()
-# options are ANY, ONE, TWO, THREE, LOCAL_QUORUM, EACH_QUORUM, QUORUM and ALL.
-gora.cassandrastore.cf.consistency.level=ONE
-gora.cassandrastore.read.consistency.level=QUORUM
-gora.cassandrastore.write.consistency.level=ONE
+gora.cassandrastore.keyspace=
+gora.cassandrastore.name=
+gora.cassandrastore.class=
+gora.cassandrastore.qualifier=
+gora.cassandrastore.family=
+gora.cassandrastore.type=
+gora.cassandraStore.cluster=Test Cluster
+gora.cassandraStore.host=localhost
+
+
+
+
+
Modified: gora/trunk/gora-cassandra/src/test/conf/log4j-server.properties
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/test/conf/log4j-server.properties?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/test/conf/log4j-server.properties (original)
+++ gora/trunk/gora-cassandra/src/test/conf/log4j-server.properties Sat Apr 12 19:21:53 2014
@@ -42,3 +42,6 @@ log4j.appender.R.File=/var/log/cassandra
# Adding this to avoid thrift logging disconnect errors.
log4j.logger.org.apache.thrift.server.TNonblockingServer=ERROR
+# Add for gora-cassandra specific logging duing tests
+log4j.logger.org.apache.gora.cassandra=DEBUG
+
Modified: gora/trunk/gora-cassandra/src/test/java/org/apache/gora/cassandra/GoraCassandraTestDriver.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/test/java/org/apache/gora/cassandra/GoraCassandraTestDriver.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/test/java/org/apache/gora/cassandra/GoraCassandraTestDriver.java (original)
+++ gora/trunk/gora-cassandra/src/test/java/org/apache/gora/cassandra/GoraCassandraTestDriver.java Sat Apr 12 19:21:53 2014
@@ -23,17 +23,13 @@
package org.apache.gora.cassandra;
-import java.io.IOException;
-
import org.apache.gora.GoraTestDriver;
import org.apache.gora.cassandra.store.CassandraStore;
-import org.apache.hadoop.conf.Configuration;
-
import java.io.File;
import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.thrift.CassandraDaemon;
+import org.apache.cassandra.service.CassandraDaemon;
// Logging imports
import org.slf4j.Logger;
@@ -70,7 +66,7 @@ public class GoraCassandraTestDriver ext
}
/**
- * starts embedded Cassandra server.
+ * Starts embedded Cassandra server.
*
* @throws Exception
* if an error occurs
@@ -91,21 +87,21 @@ public class GoraCassandraTestDriver ext
public void run() {
try {
- cassandraDaemon.start();
- } catch (Exception e) {
- log.error("Embedded casandra server run failed!", e);
- }
+ cassandraDaemon.start();
+ } catch (Exception e) {
+ log.error("Embedded casandra server run failed!", e);
+ }
}
});
cassandraThread.setDaemon(true);
cassandraThread.start();
- } catch (Exception e) {
- log.error("Embedded casandra server start failed!", e);
+ } catch (Exception e) {
+ log.error("Embedded casandra server start failed!", e);
- // cleanup
- tearDownClass();
- }
+ // cleanup
+ tearDownClass();
+ }
}
/**
@@ -136,15 +132,15 @@ public class GoraCassandraTestDriver ext
int tries = 3;
while (tries-- > 0) {
try {
- cleanupDirectories();
- break;
+ cleanupDirectories();
+ break;
} catch (Exception e) {
- // ignore exception
- try {
- Thread.sleep(250);
- } catch (InterruptedException e1) {
- // ignore exception
- }
+ // ignore exception
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e1) {
+ // ignore exception
+ }
}
}
}
Modified: gora/trunk/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java (original)
+++ gora/trunk/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java Sat Apr 12 19:21:53 2014
@@ -68,49 +68,37 @@ public class TestCassandraStore extends
}
- //We need to skip the following tests for a while until we fix some issues..
-
- @Ignore("skipped until some bugs are fixed")
- @Override
- public void testGetWebPageDefaultFields() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
+ @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
@Override
public void testQuery() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
+ @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
@Override
public void testQueryStartKey() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
+ @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
@Override
public void testQueryEndKey() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
+ @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
@Override
public void testQueryKeyRange() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
+ @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
+ @Override
+ public void testQueryWebPageSingleKey() throws IOException {}
+ @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
@Override
public void testQueryWebPageSingleKeyDefaultFields() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
+ @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
+ @Override
+ public void testQueryWebPageQueryEmptyResults() throws IOException {}
+ @Ignore("GORA-154 delete() and deleteByQuery() methods are not implemented at CassandraStore, and always returns false or 0")
@Override
public void testDelete() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
+ @Ignore("GORA-154 delete() and deleteByQuery() methods are not implemented at CassandraStore, and always returns false or 0")
@Override
public void testDeleteByQuery() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
+ @Ignore("GORA-154 delete() and deleteByQuery() methods are not implemented at CassandraStore, and always returns false or 0")
@Override
public void testDeleteByQueryFields() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
+ @Ignore("GORA-298 Implement CassandraStore#getPartitions")
@Override
public void testGetPartitions() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
- @Override
- public void testGetRecursive() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
- @Override
- public void testGetDoubleRecursive() throws IOException{}
- @Ignore("skipped until some bugs are fixed")
- @Override
- public void testGetNested() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
- @Override
- public void testGet3UnionField() throws IOException {}
-
}
Added: gora/trunk/gora-compiler-cli/pom.xml
URL: http://svn.apache.org/viewvc/gora/trunk/gora-compiler-cli/pom.xml?rev=1586888&view=auto
==============================================================================
--- gora/trunk/gora-compiler-cli/pom.xml (added)
+++ gora/trunk/gora-compiler-cli/pom.xml Sat Apr 12 19:21:53 2014
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.gora</groupId>
+ <artifactId>gora</artifactId>
+ <version>0.4-SNAPSHOT</version>
+ <relativePath>../</relativePath>
+ </parent>
+ <artifactId>gora-compiler-cli</artifactId>
+ <packaging>bundle</packaging>
+
+ <name>Apache Gora :: Compiler-CLI</name>
+ <url>http://gora.apache.org</url>
+ <description>The Apache Gora open source framework provides an in-memory data model and
+ persistence for big data. Gora supports persisting to column stores, key value stores,
+ document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce
+ support.</description>
+ <inceptionYear>2010</inceptionYear>
+ <organization>
+ <name>The Apache Software Foundation</name>
+ <url>http://www.apache.org/</url>
+ </organization>
+ <scm>
+ <url>http://svn.apache.org/viewvc/gora/trunk/gora-compiler-cli/</url>
+ <connection>scm:svn:http://svn.apache.org/repos/asf/gora/trunk/gora-compiler-cli/</connection>
+ <developerConnection>scm:svn:https://svn.apache.org/repos/asf/gora/trunk/gora-compiler-cli/</developerConnection>
+ </scm>
+ <issueManagement>
+ <system>JIRA</system>
+ <url>https://issues.apache.org/jira/browse/GORA</url>
+ </issueManagement>
+ <ciManagement>
+ <system>Jenkins</system>
+ <url>https://builds.apache.org/job/Gora-trunk/</url>
+ </ciManagement>
+
+ <properties>
+ <osgi.import>*</osgi.import>
+ <osgi.export>org.apache.gora.compiler.cli*;version="${project.version}";-noimport:=true</osgi.export>
+ </properties>
+ <dependencies>
+ <dependency>
+ <artifactId>gora-compiler</artifactId>
+ <groupId>org.apache.gora</groupId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </dependency>
+ </dependencies>
+</project>
Propchange: gora/trunk/gora-compiler-cli/pom.xml
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: gora/trunk/gora-compiler-cli/src/main/java/org/apache/gora/compiler/cli/GoraCompilerCLI.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-compiler-cli/src/main/java/org/apache/gora/compiler/cli/GoraCompilerCLI.java?rev=1586888&view=auto
==============================================================================
--- gora/trunk/gora-compiler-cli/src/main/java/org/apache/gora/compiler/cli/GoraCompilerCLI.java (added)
+++ gora/trunk/gora-compiler-cli/src/main/java/org/apache/gora/compiler/cli/GoraCompilerCLI.java Sat Apr 12 19:21:53 2014
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.compiler.cli;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.gora.compiler.GoraCompiler;
+
+
+public class GoraCompilerCLI {
+
+ public static void main(String[] args) {
+ if(args.length == 1 && (args[0].equals("--help") || args[0].equals("-h"))){
+ printHelp();
+ System.exit(0);
+ }
+ if(args.length < 2){
+ System.err.println("Must supply at least one source file and an output directory.");
+ printHelp();
+ System.exit(1);
+ }
+ File outputDir = new File(args[args.length-1]);
+ if(!outputDir.isDirectory()){
+ System.err.println("Must supply a directory for output");
+ printHelp();
+ System.exit(1);
+ }
+ File[] inputs = new File[args.length-1];
+ for(int i = 0; i<inputs.length; i++){
+ File inputFile = new File(args[i]);
+ if(!inputFile.isFile()){
+ System.err.println("Input must be a file.");
+ printHelp();
+ System.exit(1);
+ }
+ inputs[i] = inputFile;
+ }
+ try {
+ GoraCompiler.compileSchema(inputs, outputDir);
+ System.out.println("Compiler executed SUCCESSFULL.");
+ } catch (IOException e) {
+ System.err.println("Error while compiling schema files. Check that the schemas are properly formatted.");
+ printHelp();
+ e.printStackTrace(System.err);
+ }
+ }
+
+ private static void printHelp() {
+ PrintStream out = System.out;
+ out.println("Usage: gora-compiler ( -h | --help ) | (<input> [<input>...] <output>)");
+ }
+}
Propchange: gora/trunk/gora-compiler-cli/src/main/java/org/apache/gora/compiler/cli/GoraCompilerCLI.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: gora/trunk/gora-compiler/pom.xml
URL: http://svn.apache.org/viewvc/gora/trunk/gora-compiler/pom.xml?rev=1586888&view=auto
==============================================================================
--- gora/trunk/gora-compiler/pom.xml (added)
+++ gora/trunk/gora-compiler/pom.xml Sat Apr 12 19:21:53 2014
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.gora</groupId>
+ <artifactId>gora</artifactId>
+ <version>0.4-SNAPSHOT</version>
+ <relativePath>../</relativePath>
+ </parent>
+ <artifactId>gora-compiler</artifactId>
+ <packaging>bundle</packaging>
+
+ <name>Apache Gora :: Compiler</name>
+ <url>http://gora.apache.org</url>
+ <description>The Apache Gora open source framework provides an in-memory data model and
+ persistence for big data. Gora supports persisting to column stores, key value stores,
+ document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce
+ support.</description>
+ <inceptionYear>2010</inceptionYear>
+ <organization>
+ <name>The Apache Software Foundation</name>
+ <url>http://www.apache.org/</url>
+ </organization>
+ <scm>
+ <url>http://svn.apache.org/viewvc/gora/trunk/gora-compiler/</url>
+ <connection>scm:svn:http://svn.apache.org/repos/asf/gora/trunk/gora-compiler/</connection>
+ <developerConnection>scm:svn:https://svn.apache.org/repos/asf/gora/trunk/gora-compiler/</developerConnection>
+ </scm>
+ <issueManagement>
+ <system>JIRA</system>
+ <url>https://issues.apache.org/jira/browse/GORA</url>
+ </issueManagement>
+ <ciManagement>
+ <system>Jenkins</system>
+ <url>https://builds.apache.org/job/Gora-trunk/</url>
+ </ciManagement>
+
+ <properties>
+ <osgi.import>*</osgi.import>
+ <osgi.export>org.apache.gora.compiler*;version="${project.version}";-noimport:=true</osgi.export>
+ </properties>
+ <build>
+ <directory>target</directory>
+ <outputDirectory>target/classes</outputDirectory>
+ <finalName>${project.artifactId}-${project.version}</finalName>
+ <testOutputDirectory>target/test-classes</testOutputDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testResources>
+ <testResource>
+ <directory>src/test/conf/</directory>
+ <includes>
+ <include>**</include>
+ </includes>
+ </testResource>
+ </testResources>
+ <resources>
+ <resource>
+ <directory>src/main/velocity</directory>
+ <includes>
+ <include>**/*.vm</include>
+ </includes>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.gora.compiler.GoraCompiler</mainClass>
+ <packageName>org.apache.gora.compiler</packageName>
+ </manifest>
+ </archive>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-compiler</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
Propchange: gora/trunk/gora-compiler/pom.xml
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: gora/trunk/gora-compiler/src/main/java/org/apache/gora/compiler/GoraCompiler.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-compiler/src/main/java/org/apache/gora/compiler/GoraCompiler.java?rev=1586888&view=auto
==============================================================================
--- gora/trunk/gora-compiler/src/main/java/org/apache/gora/compiler/GoraCompiler.java (added)
+++ gora/trunk/gora-compiler/src/main/java/org/apache/gora/compiler/GoraCompiler.java Sat Apr 12 19:21:53 2014
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.compiler;
+
+import java.beans.PersistenceDelegate;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.compiler.specific.SpecificCompiler;
+import org.apache.avro.generic.GenericData.StringType;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.JsonNodeFactory;
+
+public class GoraCompiler extends SpecificCompiler {
+
+ public static String DIRTY_BYTES_FIELD_NAME = "__g__dirty";
+ public static final int FIRST_UNMANAGED_FIELD_INDEX = 1;
+
+ private static final Set<String> GORA_RESERVED_NAMES = new HashSet<String>();
+
+ static {
+ GORA_RESERVED_NAMES.addAll(Arrays.asList(new String[] { DIRTY_BYTES_FIELD_NAME }));
+ }
+
+ private static final Set<String> GORA_HIDDEN_FIELD_NAMES = new HashSet<String>();
+
+ static {
+ GORA_HIDDEN_FIELD_NAMES.add(DIRTY_BYTES_FIELD_NAME);
+ }
+
+ public static void compileSchema(File[] srcFiles, File dest)
+ throws IOException {
+ Schema.Parser parser = new Schema.Parser();
+
+ for (File src : srcFiles) {
+ System.out.println("Compiling: " + src.getAbsolutePath());
+ Schema originalSchema = parser.parse(src);
+ Map<Schema,Schema> queue = new HashMap<Schema,Schema>();
+ Schema newSchema = getSchemaWithDirtySupport(originalSchema, queue);
+ GoraCompiler compiler = new GoraCompiler(newSchema);
+ compiler.setTemplateDir("/org/apache/gora/compiler/templates/");
+ compiler.compileToDestination(src, dest);
+ System.out.println("Compiled into: " + dest.getAbsolutePath());
+ }
+ }
+
+ public static String generateAppropriateImmutabilityModifier(Schema schema){
+ switch (schema.getType()) {
+ case BYTES:
+ return ".asReadOnlyBuffer()";
+ default:
+ return "";
+ }
+ }
+
+ public static String generateAppropriateWrapperOrValue(Schema schema) {
+ switch (schema.getType()) {
+ case MAP:
+ return "(value instanceof org.apache.gora.persistency.Dirtyable) ? "
+ + "value : new org.apache.gora.persistency.impl.DirtyMapWrapper(value)";
+ case ARRAY:
+ return "(value instanceof org.apache.gora.persistency.Dirtyable) ? "
+ + "value : new org.apache.gora.persistency.impl.DirtyListWrapper(value)";
+ case BYTES:
+ return "deepCopyToReadOnlyBuffer(value)";
+ default:
+ return "value";
+ }
+ }
+
+ public static String generateAppropriateWrapperOrValueForPut(Schema schema) {
+ switch (schema.getType()) {
+ case MAP:
+ return "(value instanceof org.apache.gora.persistency.Dirtyable) ? "
+ + "value : new org.apache.gora.persistency.impl.DirtyMapWrapper((java.util.Map)value)";
+ case ARRAY:
+ return "(value instanceof org.apache.gora.persistency.Dirtyable) ? "
+ + "value : new org.apache.gora.persistency.impl.DirtyListWrapper((java.util.List)value)";
+ default:
+ return "value";
+ }
+ }
+
+ public static String generateAppropriateWrapper(Schema schema, Field field) {
+ if (field.name() == "__g__dirty") {
+ return "java.nio.ByteBuffer.wrap(new byte["
+ + getNumberOfBytesNeededForDirtyBits(schema) + "])";
+ } else {
+ switch (field.schema().getType()) {
+ case RECORD:
+ return field.schema().getName()+".newBuilder().build()";
+ case MAP:
+ return "new org.apache.gora.persistency.impl.DirtyMapWrapper((java.util.Map)defaultValue(fields()["+field.pos()+"]))";
+ case ARRAY:
+ return "new org.apache.gora.persistency.impl.DirtyListWrapper((java.util.List)defaultValue(fields()["+field.pos()+"]))";
+ default:
+ return "defaultValue(fields()["+field.pos()+"])";
+ }
+ }
+
+ }
+
+ public static String generateAppropriateValue(Field field) {
+ switch (field.schema().getType()) {
+ case RECORD:
+ return field.schema().getName()+".newBuilder().build()";
+ case MAP:
+ return "new org.apache.gora.persistency.impl.DirtyMapWrapper(new java.util.HashMap())";
+ case ARRAY:
+ return "new org.apache.gora.persistency.impl.DirtyListWrapper(new java.util.ArrayList())";
+ default:
+ return "this."+field.name();
+ }
+ }
+
+ /** Recognizes camel case */
+ public static String toUpperCase(String s) {
+ StringBuilder builder = new StringBuilder();
+
+ for (int i = 0; i < s.length(); i++) {
+ if (i > 0) {
+ if (Character.isUpperCase(s.charAt(i))
+ && Character.isLowerCase(s.charAt(i - 1))
+ && Character.isLetter(s.charAt(i))) {
+ builder.append("_");
+ }
+ }
+ builder.append(Character.toUpperCase(s.charAt(i)));
+ }
+
+ return builder.toString();
+ }
+
+ private static int getNumberOfBytesNeededForDirtyBits(Schema originalSchema) {
+ return (int) Math.ceil((originalSchema.getFields().size() + 1) * 0.125);
+ }
+
+ public static String generateDirtyMethod(Schema schema, Field field) {
+ /*
+ * TODO: See AVRO-1127. This is dirty. We need to file a bug in avro to
+ * get them to open the API so other compilers can use their utility
+ * methods
+ */
+ String getMethod = generateGetMethod(schema, field);
+ String dirtyMethod = "is" + getMethod.substring(3) + "Dirty";
+ return dirtyMethod;
+ }
+
+ public static String generateDefaultValueString(Schema schema, String fieldName) {
+ if (fieldName == "__g__dirty") {
+ return "java.nio.ByteBuffer.wrap(new byte["
+ + getNumberOfBytesNeededForDirtyBits(schema) + "])";
+ } else {
+ throw new IllegalArgumentException(fieldName
+ + " is not a gora managed field.");
+ }
+ }
+
+ public static boolean isNotHiddenField(String fieldName) {
+ return !GORA_HIDDEN_FIELD_NAMES.contains(fieldName);
+ }
+
+ GoraCompiler(Schema schema) {
+ super(schema);
+ }
+
+ private static Schema getSchemaWithDirtySupport(Schema originalSchema, Map<Schema,Schema> queue) throws IOException {
+ switch (originalSchema.getType()) {
+ case RECORD:
+ if (queue.containsKey(originalSchema)) {
+ return queue.get(originalSchema);
+ }
+ return getRecordSchemaWithDirtySupport(originalSchema,queue);
+ case UNION:
+ return getUnionSchemaWithDirtySupport(originalSchema,queue);
+ case MAP:
+ return getMapSchemaWithDirtySupport(originalSchema,queue);
+ case ARRAY:
+ return getArraySchemaWithDirtySupport(originalSchema,queue);
+ default:
+ return originalSchema;
+ }
+ }
+
+ private static Schema getArraySchemaWithDirtySupport(Schema originalSchema, Map<Schema,Schema> queue) throws IOException {
+ return Schema.createArray(getSchemaWithDirtySupport(originalSchema.getElementType(),queue));
+ }
+
+ private static Schema getMapSchemaWithDirtySupport(Schema originalSchema, Map<Schema,Schema> queue) throws IOException {
+ return Schema.createMap(getSchemaWithDirtySupport(originalSchema.getValueType(),queue));
+ }
+
+ private static Schema getUnionSchemaWithDirtySupport(Schema originalSchema, Map<Schema,Schema> queue) throws IOException {
+ List<Schema> schemaTypes = originalSchema.getTypes();
+ List<Schema> newTypeSchemas = new ArrayList<Schema>();
+ for (int i = 0; i < schemaTypes.size(); i++) {
+ Schema currentTypeSchema = schemaTypes.get(i);
+ newTypeSchemas.add(getSchemaWithDirtySupport(currentTypeSchema,queue));
+ }
+ return Schema.createUnion(newTypeSchemas);
+ }
+
+ private static Schema getRecordSchemaWithDirtySupport(Schema originalSchema, Map<Schema,Schema> queue) throws IOException {
+ if (originalSchema.getType() != Type.RECORD) {
+ throw new IOException("Gora only supports record schemas.");
+ }
+ List<Field> originalFields = originalSchema.getFields();
+ /* make sure the schema doesn't contain the field __g__dirty */
+ for (Field field : originalFields) {
+ if (GORA_RESERVED_NAMES.contains(field.name())) {
+ throw new IOException(
+ "Gora schemas cannot contain the field name " + field.name());
+ }
+ }
+ Schema newSchema = Schema.createRecord(originalSchema.getName(),
+ originalSchema.getDoc(), originalSchema.getNamespace(),
+ originalSchema.isError());
+
+ queue.put(originalSchema, newSchema);
+
+ List<Field> newFields = new ArrayList<Schema.Field>();
+ byte[] defaultDirtyBytesValue = new byte[getNumberOfBytesNeededForDirtyBits(originalSchema)];
+ Arrays.fill(defaultDirtyBytesValue, (byte) 0);
+ JsonNode defaultDirtyJsonValue = JsonNodeFactory.instance
+ .binaryNode(defaultDirtyBytesValue);
+ Field dirtyBits = new Field(DIRTY_BYTES_FIELD_NAME,
+ Schema.create(Type.BYTES),
+ "Bytes used to represent weather or not a field is dirty.",
+ defaultDirtyJsonValue);
+ newFields.add(dirtyBits);
+ for (Field originalField : originalFields) {
+ // recursively add dirty support
+ Field newField = new Field(originalField.name(),
+ getSchemaWithDirtySupport(originalField.schema(),queue),
+ originalField.doc(), originalField.defaultValue(),
+ originalField.order());
+ newFields.add(newField);
+ }
+ newSchema.setFields(newFields);
+ return newSchema;
+ }
+
+}
Propchange: gora/trunk/gora-compiler/src/main/java/org/apache/gora/compiler/GoraCompiler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: gora/trunk/gora-compiler/src/main/velocity/org/apache/gora/compiler/templates/record.vm
URL: http://svn.apache.org/viewvc/gora/trunk/gora-compiler/src/main/velocity/org/apache/gora/compiler/templates/record.vm?rev=1586888&view=auto
==============================================================================
--- gora/trunk/gora-compiler/src/main/velocity/org/apache/gora/compiler/templates/record.vm (added)
+++ gora/trunk/gora-compiler/src/main/velocity/org/apache/gora/compiler/templates/record.vm Sat Apr 12 19:21:53 2014
@@ -0,0 +1,349 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements. See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership. The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+##
+#if ($schema.getNamespace())
+package $schema.getNamespace();
+#end
+@SuppressWarnings("all")
+#if ($schema.getDoc())
+/** $schema.getDoc() */
+#end
+public class ${this.mangle($schema.getName())}#if ($schema.isError()) extends org.apache.avro.specific.SpecificExceptionBase#else extends org.apache.gora.persistency.impl.PersistentBase#end implements org.apache.avro.specific.SpecificRecord, org.apache.gora.persistency.Persistent {
+ public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("${this.javaEscape($schema.toString())}");
+
+ /** Enum containing all data bean's fields. */
+ public static enum Field {
+#set ($i = 0)
+#foreach ($field in $schema.getFields())
+ ${this.toUpperCase($field.name())}($i, "${this.mangle($field.name(), $schema.isError())}"),
+#set ($i = $i + 1)
+#end
+ ;
+ /**
+ * Field's index.
+ */
+ private int index;
+
+ /**
+ * Field's name.
+ */
+ private String name;
+
+ /**
+ * Field's constructor
+ * @param index field's index.
+ * @param name field's name.
+ */
+ Field(int index, String name) {this.index=index;this.name=name;}
+
+ /**
+ * Gets field's index.
+ * @return int field's index.
+ */
+ public int getIndex() {return index;}
+
+ /**
+ * Gets field's name.
+ * @return String field's name.
+ */
+ public String getName() {return name;}
+
+ /**
+ * Gets field's attributes to string.
+ * @return String field's attributes to string.
+ */
+ public String toString() {return name;}
+ };
+
+ public static final String[] _ALL_FIELDS = {
+#foreach ($field in $schema.getFields())
+ "${this.mangle($field.name(), $schema.isError())}",
+#end
+ };
+
+#foreach ($field in $schema.getFields())
+#if ($field.doc())
+ /** $field.doc() */
+#end
+ private ${this.javaUnbox($field.schema())} ${this.mangle($field.name(), $schema.isError())}#if(! $this.isNotHiddenField($field.name()) ) = ${this.generateDefaultValueString($schema,$field.name())}#end;
+#end
+#if ($schema.isError())
+
+ public ${this.mangle($schema.getName())}() {
+ super();
+ }
+
+ public ${this.mangle($schema.getName())}(Object value) {
+ super(value);
+ }
+
+ public ${this.mangle($schema.getName())}(Throwable cause) {
+ super(cause);
+ }
+
+ public ${this.mangle($schema.getName())}(Object value, Throwable cause) {
+ super(value, cause);
+ }
+
+#end
+ public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+ // Used by DatumWriter. Applications should not call.
+ public java.lang.Object get(int field$) {
+ switch (field$) {
+#set ($i = 0)
+#foreach ($field in $schema.getFields())
+ case $i: return ${this.mangle($field.name(), $schema.isError())};
+#set ($i = $i + 1)
+#end
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ // Used by DatumReader. Applications should not call.
+ @SuppressWarnings(value="unchecked")
+ public void put(int field$, java.lang.Object value) {
+ switch (field$) {
+#set ($i = 0)
+#foreach ($field in $schema.getFields())
+ case $i: ${this.mangle($field.name(), $schema.isError())} = (${this.javaType($field.schema())})(${this.generateAppropriateWrapperOrValueForPut($field.schema())}); break;
+#set ($i = $i + 1)
+#end
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+#foreach ($field in $schema.getFields())
+#if ($this.isNotHiddenField($field.name()))
+ /**
+ * Gets the value of the '${this.mangle($field.name(), $schema.isError())}' field.
+#if ($field.doc()) * $field.doc()#end
+ */
+ public ${this.javaType($field.schema())} ${this.generateGetMethod($schema, $field)}() {
+ return ${this.mangle($field.name(), $schema.isError())}${this.generateAppropriateImmutabilityModifier($field.schema())};
+ }
+
+ /**
+ * Sets the value of the '${this.mangle($field.name(), $schema.isError())}' field.
+#if ($field.doc()) * $field.doc()#end
+ * @param value the value to set.
+ */
+ public void ${this.generateSetMethod($schema, $field)}(${this.javaType($field.schema())} value) {
+ this.${this.mangle($field.name(), $schema.isError())} = ${this.generateAppropriateWrapperOrValue($field.schema())};
+ setDirty(${field.pos()});
+ }
+
+ /**
+ * Checks the dirty status of the '${this.mangle($field.name(), $schema.isError())}' field. A field is dirty if it represents a change that has not yet been written to the database.
+#if ($field.doc()) * $field.doc()#end
+ * @param value the value to set.
+ */
+ public boolean ${this.generateDirtyMethod($schema, $field)}(${this.javaType($field.schema())} value) {
+ return isDirty(${field.pos()});
+ }
+
+#end
+#end
+ /** Creates a new ${this.mangle($schema.getName())} RecordBuilder */
+ public static #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder newBuilder() {
+ return new #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder();
+ }
+
+ /** Creates a new ${this.mangle($schema.getName())} RecordBuilder by copying an existing Builder */
+ public static #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder newBuilder(#if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder other) {
+ return new #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder(other);
+ }
+
+ /** Creates a new ${this.mangle($schema.getName())} RecordBuilder by copying an existing $this.mangle($schema.getName()) instance */
+ public static #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder newBuilder(#if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())} other) {
+ return new #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder(other);
+ }
+
+ private static java.nio.ByteBuffer deepCopyToWriteOnlyBuffer(
+ java.nio.ByteBuffer input) {
+ java.nio.ByteBuffer copy = java.nio.ByteBuffer.allocate(input.capacity());
+ int position = input.position();
+ input.reset();
+ int mark = input.position();
+ int limit = input.limit();
+ input.rewind();
+ input.limit(input.capacity());
+ copy.put(input);
+ input.rewind();
+ copy.rewind();
+ input.position(mark);
+ input.mark();
+ copy.position(mark);
+ copy.mark();
+ input.position(position);
+ copy.position(position);
+ input.limit(limit);
+ copy.limit(limit);
+ return copy.asReadOnlyBuffer();
+ }
+
+ /**
+ * RecordBuilder for ${this.mangle($schema.getName())} instances.
+ */
+ public static class Builder extends#if ($schema.isError()) org.apache.avro.specific.SpecificErrorBuilderBase<${this.mangle($schema.getName())}>#else org.apache.avro.specific.SpecificRecordBuilderBase<${this.mangle($schema.getName())}>#end
+
+ implements#if ($schema.isError()) org.apache.avro.data.ErrorBuilder<${this.mangle($schema.getName())}>#else org.apache.avro.data.RecordBuilder<${this.mangle($schema.getName())}>#end {
+
+#foreach ($field in $schema.getFields())
+ private ${this.javaUnbox($field.schema())} ${this.mangle($field.name(), $schema.isError())};
+#end
+
+ /** Creates a new Builder */
+ private Builder() {
+ super(#if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.SCHEMA$);
+ }
+
+ /** Creates a Builder by copying an existing Builder */
+ private Builder(#if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder other) {
+ super(other);
+ }
+
+ /** Creates a Builder by copying an existing $this.mangle($schema.getName()) instance */
+ private Builder(#if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())} other) {
+ #if ($schema.isError())super(other)#else
+ super(#if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.SCHEMA$)#end;
+#foreach ($field in $schema.getFields())
+ if (isValidValue(fields()[$field.pos()], other.${this.mangle($field.name(), $schema.isError())})) {
+ this.${this.mangle($field.name(), $schema.isError())} = (${this.javaType($field.schema())}) data().deepCopy(fields()[$field.pos()].schema(), other.${this.mangle($field.name(), $schema.isError())});
+ fieldSetFlags()[$field.pos()] = true;
+ }
+#end
+ }
+#if ($schema.isError())
+
+ @Override
+ public #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder setValue(Object value) {
+ super.setValue(value);
+ return this;
+ }
+
+ @Override
+ public #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder clearValue() {
+ super.clearValue();
+ return this;
+ }
+
+ @Override
+ public #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder setCause(Throwable cause) {
+ super.setCause(cause);
+ return this;
+ }
+
+ @Override
+ public #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder clearCause() {
+ super.clearCause();
+ return this;
+ }
+#end
+
+#foreach ($field in $schema.getFields())
+#if ($this.isNotHiddenField($field.name()))
+ /** Gets the value of the '${this.mangle($field.name(), $schema.isError())}' field */
+ public ${this.javaType($field.schema())} ${this.generateGetMethod($schema, $field)}() {
+ return ${this.mangle($field.name(), $schema.isError())};
+ }
+
+ /** Sets the value of the '${this.mangle($field.name(), $schema.isError())}' field */
+ public #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder ${this.generateSetMethod($schema, $field)}(${this.javaUnbox($field.schema())} value) {
+ validate(fields()[$field.pos()], value);
+ this.${this.mangle($field.name(), $schema.isError())} = value;
+ fieldSetFlags()[$field.pos()] = true;
+ return this;
+ }
+
+ /** Checks whether the '${this.mangle($field.name(), $schema.isError())}' field has been set */
+ public boolean ${this.generateHasMethod($schema, $field)}() {
+ return fieldSetFlags()[$field.pos()];
+ }
+
+ /** Clears the value of the '${this.mangle($field.name(), $schema.isError())}' field */
+ public #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder ${this.generateClearMethod($schema, $field)}() {
+#if (${this.isUnboxedJavaTypeNullable($field.schema())})
+ ${this.mangle($field.name(), $schema.isError())} = null;
+#end
+ fieldSetFlags()[$field.pos()] = false;
+ return this;
+ }
+
+#end
+#end
+ @Override
+ public ${this.mangle($schema.getName())} build() {
+ try {
+ ${this.mangle($schema.getName())} record = new ${this.mangle($schema.getName())}(#if ($schema.isError())getValue(), getCause()#end);
+#foreach ($field in $schema.getFields())
+ record.${this.mangle($field.name(), $schema.isError())} = fieldSetFlags()[$field.pos()] ? this.${this.mangle($field.name(), $schema.isError())} : (${this.javaType($field.schema())}) ${this.generateAppropriateWrapper($schema,$field)};
+#end
+ return record;
+ } catch (Exception e) {
+ throw new org.apache.avro.AvroRuntimeException(e);
+ }
+ }
+ }
+
+ public ${this.mangle($schema.getName())}.Tombstone getTombstone(){
+ return TOMBSTONE;
+ }
+
+ public ${this.mangle($schema.getName())} newInstance(){
+ return newBuilder().build();
+ }
+
+ private static final Tombstone TOMBSTONE = new Tombstone();
+
+ public static final class Tombstone extends ${this.mangle($schema.getName())} implements org.apache.gora.persistency.Tombstone {
+
+ private Tombstone() { }
+
+ #foreach ($field in $schema.getFields())
+ #if ($this.isNotHiddenField($field.name()))
+ /**
+ * Gets the value of the '${this.mangle($field.name(), $schema.isError())}' field.
+ #if ($field.doc()) * $field.doc()#end
+ */
+ public ${this.javaType($field.schema())} ${this.generateGetMethod($schema, $field)}() {
+ throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones");
+ }
+
+ /**
+ * Sets the value of the '${this.mangle($field.name(), $schema.isError())}' field.
+ #if ($field.doc()) * $field.doc()#end
+ * @param value the value to set.
+ */
+ public void ${this.generateSetMethod($schema, $field)}(${this.javaType($field.schema())} value) {
+ throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones");
+ }
+
+ /**
+ * Checks the dirty status of the '${this.mangle($field.name(), $schema.isError())}' field. A field is dirty if it represents a change that has not yet been written to the database.
+ #if ($field.doc()) * $field.doc()#end
+ * @param value the value to set.
+ */
+ public boolean ${this.generateDirtyMethod($schema, $field)}(${this.javaType($field.schema())} value) {
+ throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones");
+ }
+
+ #end
+ #end
+
+ }
+
+}
\ No newline at end of file
Modified: gora/trunk/gora-core/pom.xml
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/pom.xml?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-core/pom.xml (original)
+++ gora/trunk/gora-core/pom.xml Sat Apr 12 19:21:53 2014
@@ -99,9 +99,9 @@
<version>${build-helper-maven-plugin.version}</version>
<executions>
<execution>
- <phase>generate-sources</phase>
+ <phase>generate-test-sources</phase>
<goals>
- <goal>add-source</goal>
+ <goal>add-test-source</goal>
</goals>
<configuration>
<sources>
@@ -127,14 +127,24 @@
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
+ <groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ </dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
<!-- Logging Dependencies -->
<dependency>
@@ -158,7 +168,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
- <scope>test</scope>
+ <scope>provided</scope>
</dependency>
<dependency>
Modified: gora/trunk/gora-core/src/examples/avro/employee.json
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/examples/avro/employee.json?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-core/src/examples/avro/employee.json (original)
+++ gora/trunk/gora-core/src/examples/avro/employee.json Sat Apr 12 19:21:53 2014
@@ -1,30 +1,31 @@
{
"type": "record",
- "name": "Employee",
+ "name": "Employee","default":null,
"namespace": "org.apache.gora.examples.generated",
"fields" : [
- {"name": "name", "type": "string"},
- {"name": "dateOfBirth", "type": "long"},
- {"name": "ssn", "type": "string"},
- {"name": "salary", "type": "int"},
- {"name": "boss", "type":["null","Employee","string"]},
- {"name": "webpage", "type":["null",
+ {"name": "name", "type": ["null","string"],"default":null},
+ {"name": "dateOfBirth", "type": "long","default":0},
+ {"name": "ssn", "type": "string", "default":""},
+ {"name": "salary", "type": "int","default":0},
+ {"name": "boss", "type":["null","Employee","string"],"default":null},
+ {"name": "webpage","default":null, "type":["null",
{
"type": "record",
"name": "WebPage",
"namespace": "org.apache.gora.examples.generated",
"fields" : [
- {"name": "url", "type": "string"},
- {"name": "content", "type": ["null","bytes"]},
- {"name": "parsedContent", "type": {"type":"array", "items": "string"}},
- {"name": "outlinks", "type": {"type":"map", "values":"string"}},
- {"name": "metadata", "type": {
+ {"name": "url", "type": ["null","string"], "default":null},
+ {"name": "content", "type": ["null","bytes"],"default":null},
+ {"name": "parsedContent", "type": {"type":"array", "items": "string"},"default":{}},
+ {"name": "outlinks", "type": {"type":"map", "values":["null", "string"]},"default":{}},
+ {"name": "headers", "type": ["null", {"type": "map", "values": ["null", "string"]}],"default":null},
+ {"name": "metadata", "default":null, "type": {
"name": "Metadata",
"type": "record",
"namespace": "org.apache.gora.examples.generated",
"fields": [
- {"name": "version", "type": "int"},
- {"name": "data", "type": {"type": "map", "values": "string"}}
+ {"name": "version", "type": "int","default":0},
+ {"name": "data", "type": {"type": "map", "values": "string"},"default":{}}
]
}}
]