You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2013/06/14 02:55:25 UTC
svn commit: r1492920 [1/2] - in /gora/trunk: ./
gora-accumulo/src/main/java/org/apache/gora/accumulo/store/
gora-accumulo/src/test/resources/
gora-cassandra/src/main/java/org/apache/gora/cassandra/query/
gora-cassandra/src/main/java/org/apache/gora/cas...
Author: lewismc
Date: Fri Jun 14 00:55:24 2013
New Revision: 1492920
URL: http://svn.apache.org/r1492920
Log:
GORA-174 GORA compiler does not handle [string, null] unions in the AVRO schema
Modified:
gora/trunk/CHANGES.txt
gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
gora/trunk/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
gora/trunk/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
gora/trunk/gora-core/src/examples/avro/employee.json
gora/trunk/gora-core/src/examples/avro/webpage.json
gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/WebPageDataCreator.java
gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/Employee.java
gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/Metadata.java
gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/TokenDatum.java
gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/WebPage.java
gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/WordCount.java
gora/trunk/gora-core/src/main/java/org/apache/gora/compiler/GoraCompiler.java
gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/impl/PersistentBase.java
gora/trunk/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java
gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml
gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
gora/trunk/gora-tutorial/conf/gora.properties
gora/trunk/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/generated/MetricDatum.java
gora/trunk/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/generated/Pageview.java
gora/trunk/pom.xml
Modified: gora/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/gora/trunk/CHANGES.txt?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/CHANGES.txt (original)
+++ gora/trunk/CHANGES.txt Fri Jun 14 00:55:24 2013
@@ -4,12 +4,15 @@
Gora Change Log
+* GORA-174 GORA compiler does not handle ["string", "null"] unions in the AVRO schema (alfonsonishikawa, rmarroquin, kturner via lewismc)
+ incl. GORA-206, 207 and 216.
+
* GORA-239 Add null checks and better message in AccumuloStore (David Medinets via hsaputra)
0.3 release: 05/03/2013 (mm/dd/yyyy)
-Release Report: https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12311172&version=12317954
+Release Report: https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12311172&version=12317954Gora Change Log
-* GORA-191 Support multiple Avro Schemas within GoraCompiler (Udesh Liyanaarachchi, rmarroquin, lewismc)
+* GORA-191 Support multiple Avro Schemas within GoraCompiler (Udesh Liyanaarachchi, rmarroquin, lewismc)
* GORA-159 gora-hbase MR tests should use HBaseTestingUtility instead of deprecated HBaseClusterTestCase via GORA-89
Modified: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java (original)
+++ gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java Fri Jun 14 00:55:24 2013
@@ -95,11 +95,11 @@ import org.apache.gora.store.impl.DataSt
import org.apache.gora.util.AvroUtils;
import org.apache.gora.util.GoraException;
import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
*
@@ -460,6 +460,7 @@ public class AccumuloStore<K,T extends P
break;
case RECORD:
+ case UNION:
SpecificDatumReader reader = new SpecificDatumReader(field.schema());
byte[] val = entry.getValue().get();
// TODO reuse decoder
@@ -588,6 +589,7 @@ public class AccumuloStore<K,T extends P
}
break;
case RECORD:
+ case UNION:
SpecificDatumWriter writer = new SpecificDatumWriter(field.schema());
ByteArrayOutputStream os = new ByteArrayOutputStream();
BinaryEncoder encoder = new BinaryEncoder(os);
Modified: gora/trunk/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml (original)
+++ gora/trunk/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml Fri Jun 14 00:55:24 2013
@@ -38,6 +38,8 @@
<field name="dateOfBirth" family="info" qualifier="db"/>
<field name="ssn" family="info" qualifier="sn"/>
<field name="salary" family="info" qualifier="sl"/>
+ <field name="boss" family="info" qualifier="bs"/>
+ <field name="webpage" family="info" qualifier="wp"/>
</class>
<class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" table="WebPage">
@@ -51,4 +53,4 @@
<class name="org.apache.gora.examples.generated.TokenDatum" keyClass="java.lang.String">
<field name="count" family="common" qualifier="count"/>
</class>
-</gora-orm>
\ No newline at end of file
+</gora-orm>
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java Fri Jun 14 00:55:24 2013
@@ -24,7 +24,6 @@ import me.prettyprint.hector.api.Seriali
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
import org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +40,15 @@ public abstract class CassandraColumn {
private String family;
private int type;
private Field field;
+ private int unionType;
+
+ public void setUnionType(int pUnionType){
+ this.unionType = pUnionType;
+ }
+
+ public int getUnionType(){
+ return unionType;
+ }
public String getFamily() {
return family;
@@ -67,7 +75,7 @@ public abstract class CassandraColumn {
protected Object fromByteBuffer(Schema schema, ByteBuffer byteBuffer) {
Object value = null;
- Serializer serializer = GoraSerializerTypeInferer.getSerializer(schema);
+ Serializer<?> serializer = GoraSerializerTypeInferer.getSerializer(schema);
if (serializer == null) {
LOG.info("Schema is not supported: " + schema.toString());
} else {
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java Fri Jun 14 00:55:24 2013
@@ -26,7 +26,8 @@ import me.prettyprint.cassandra.serializ
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
-import org.apache.avro.specific.SpecificFixed;
+import org.apache.avro.Schema.Type;
+import org.apache.gora.cassandra.store.CassandraStore;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.Query;
import org.apache.gora.query.impl.ResultBase;
@@ -58,6 +59,25 @@ public class CassandraResult<K, T extend
++this.rowNumber;
return (this.rowNumber <= this.cassandraResultSet.size());
}
+
+ /**
+ * Gets the column containing the type of the union type element stored.
+ * TODO: This might seem too much of a overhead if we consider that N rows have M columns,
+ * this might have to be reviewed to get the specific column in O(1)
+ * @param pFieldName
+ * @param pCassandraRow
+ * @return
+ */
+ private CassandraColumn getUnionTypeColumn(String pFieldName, Object[] pCassandraRow){
+
+ for (int iCnt = 0; iCnt < pCassandraRow.length; iCnt++){
+ CassandraColumn cColumn = (CassandraColumn)pCassandraRow[iCnt];
+ String columnName = StringSerializer.get().fromByteBuffer(cColumn.getName());
+ if (pFieldName.equals(columnName))
+ return cColumn;
+ }
+ return null;
+ }
/**
@@ -80,21 +100,42 @@ public class CassandraResult<K, T extend
String family = cassandraColumn.getFamily();
String fieldName = this.reverseMap.get(family + ":" + StringSerializer.get().fromByteBuffer(cassandraColumn.getName()));
- // get field
- int pos = this.persistent.getFieldIndex(fieldName);
- Field field = fields.get(pos);
-
- // get value
- cassandraColumn.setField(field);
- Object value = cassandraColumn.getValue();
-
- this.persistent.put(pos, value);
- // this field does not need to be written back to the store
- this.persistent.clearDirty(pos);
+ if (fieldName != null ){
+ // get field
+ int pos = this.persistent.getFieldIndex(fieldName);
+ Field field = fields.get(pos);
+ Type fieldType = field.schema().getType();
+ System.out.println(StringSerializer.get().fromByteBuffer(cassandraColumn.getName()) + fieldName + " " + fieldType.name());
+ if (fieldType == Type.UNION){
+ // TODO getting UNION stored type
+ // TODO get value of UNION stored type. This field does not need to be written back to the store
+ cassandraColumn.setUnionType(getNonNullTypePos(field.schema().getTypes()));
+ }
+
+ // get value
+ cassandraColumn.setField(field);
+ Object value = cassandraColumn.getValue();
+
+ this.persistent.put(pos, value);
+ // this field does not need to be written back to the store
+ this.persistent.clearDirty(pos);
+ }
+ else
+ LOG.debug("FieldName was null while iterating CassandraRow and using Avro Union type");
}
}
+ private int getNonNullTypePos(List<Schema> pTypes){
+ int iCnt = 0;
+ for (Schema sch : pTypes)
+ if (!sch.getName().equals("null"))
+ return iCnt;
+ else
+ iCnt++;
+ return CassandraStore.DEFAULT_UNION_SCHEMA;
+ }
+
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java Fri Jun 14 00:55:24 2013
@@ -20,6 +20,9 @@ package org.apache.gora.cassandra.query;
import java.util.ArrayList;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+
+
/**
* List of key value pairs representing a row, tagged by a key.
*/
@@ -38,5 +41,18 @@ public class CassandraRow<K> extends Arr
public void setKey(K key) {
this.key = key;
}
+
+ /**
+ * Gets a specific CassandraColumn within a row using its name
+ * @param pCassandraColumnName
+ * @return CassandraColumn
+ */
+ public CassandraColumn getCassandraColumn(String pCassandraColumnName){
+ for (CassandraColumn cColumn: this)
+ if ( pCassandraColumnName.equals(StringSerializer.get().fromByteBuffer(cColumn.getName())) )
+ return cColumn;
+
+ return null;
+ }
}
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java Fri Jun 14 00:55:24 2013
@@ -40,6 +40,7 @@ import org.apache.avro.util.Utf8;
import org.apache.gora.cassandra.serializers.GenericArraySerializer;
import org.apache.gora.cassandra.serializers.StatefulHashMapSerializer;
import org.apache.gora.cassandra.serializers.TypeUtils;
+import org.apache.gora.cassandra.store.CassandraStore;
import org.apache.gora.persistency.StatefulHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,12 +83,31 @@ public class CassandraSubColumn extends
StatefulHashMapSerializer serializer = StatefulHashMapSerializer.get(fieldSchema.getValueType());
StatefulHashMap map = serializer.fromByteBuffer(byteBuffer);
value = map;
+ } else if (type == Type.UNION){
+ // the selected union schema is obtained
+ Schema unionFieldSchema = getUnionSchema(super.getUnionType(), field.schema());
+ // we use the selected union schema to deserialize our actual value
+ value = fromByteBuffer(unionFieldSchema, byteBuffer);
} else {
value = fromByteBuffer(fieldSchema, byteBuffer);
}
return value;
}
+
+ /**
+ * Gets the specific schema for a union data type
+ * @param pSchemaPos
+ * @param pSchema
+ * @return
+ */
+ private Schema getUnionSchema (int pSchemaPos, Schema pSchema){
+ Schema unionSchema = pSchema.getTypes().get(pSchemaPos);
+ // default union element
+ if ( unionSchema == null )
+ pSchema.getTypes().get(CassandraStore.DEFAULT_UNION_SCHEMA);
+ return unionSchema;
+ }
public void setValue(HColumn<ByteBuffer, ByteBuffer> hColumn) {
this.hColumn = hColumn;
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java Fri Jun 14 00:55:24 2013
@@ -148,6 +148,8 @@ public class GoraSerializerTypeInferer {
serializer = GenericArraySerializer.get(schema.getElementType());
} else if (type == Type.MAP) {
serializer = StatefulHashMapSerializer.get(schema.getValueType());
+ } else if (type == Type.UNION){
+ serializer = ByteBufferSerializer.get();
} else {
serializer = null;
}
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Fri Jun 14 00:55:24 2013
@@ -221,10 +221,24 @@ public class CassandraClient<K, T extend
}
}
+ /**
+ * Adds an subColumn inside the cassandraMapping file when a String is serialized
+ * @param key
+ * @param fieldName
+ * @param columnName
+ * @param value
+ */
public void addSubColumn(K key, String fieldName, String columnName, Object value) {
addSubColumn(key, fieldName, StringSerializer.get().toByteBuffer(columnName), value);
}
+ /**
+ * Adds an subColumn inside the cassandraMapping file when an Integer is serialized
+ * @param key
+ * @param fieldName
+ * @param columnName
+ * @param value
+ */
public void addSubColumn(K key, String fieldName, Integer columnName, Object value) {
addSubColumn(key, fieldName, IntegerSerializer.get().toByteBuffer(columnName), value);
}
@@ -364,6 +378,20 @@ public class CassandraClient<K, T extend
return orderedRows.getList();
}
+
+ private String getMappingFamily(String pField){
+ String family = null;
+ // TODO checking if it was a UNION field the one we are retrieving
+ family = this.cassandraMapping.getFamily(pField);
+ return family;
+ }
+
+ private String getMappingColumn(String pField){
+ String column = null;
+ // TODO checking if it was a UNION field the one we are retrieving e.g. column = pField;
+ column = this.cassandraMapping.getColumn(pField);
+ return column;
+ }
/**
* Select the families that contain at least one column mapped to a query field.
@@ -373,8 +401,8 @@ public class CassandraClient<K, T extend
public Map<String, List<String>> getFamilyMap(Query<K, T> query) {
Map<String, List<String>> map = new HashMap<String, List<String>>();
for (String field: query.getFields()) {
- String family = this.cassandraMapping.getFamily(field);
- String column = this.cassandraMapping.getColumn(field);
+ String family = this.getMappingFamily(field);
+ String column = this.getMappingColumn(field);
// check if the family value was already initialized
List<String> list = map.get(family);
@@ -391,6 +419,14 @@ public class CassandraClient<K, T extend
return map;
}
+
+ /**
+ * Retrieves the cassandraMapping which holds whatever was mapped from the gora-cassandra-mapping.xml
+ * @return
+ */
+ public CassandraMapping getCassandraMapping(){
+ return this.cassandraMapping;
+ }
/**
* Select the field names according to the column names, which format if fully qualified: "family:column"
@@ -400,16 +436,15 @@ public class CassandraClient<K, T extend
public Map<String, String> getReverseMap(Query<K, T> query) {
Map<String, String> map = new HashMap<String, String>();
for (String field: query.getFields()) {
- String family = this.cassandraMapping.getFamily(field);
- String column = this.cassandraMapping.getColumn(field);
+ String family = this.getMappingFamily(field);
+ String column = this.getMappingColumn(field);
map.put(family + ":" + column, field);
}
return map;
-
}
-
+
public boolean isSuper(String family) {
return this.cassandraMapping.isSuper(family);
}
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java Fri Jun 14 00:55:24 2013
@@ -206,6 +206,17 @@ public class CassandraMapping {
}
}
+ /**
+ * Add new column to CassandraMapping using the self-explanatory parameters
+ * @param pFamilyName
+ * @param pFieldName
+ * @param pColumnName
+ */
+ public void addColumn(String pFamilyName, String pFieldName, String pColumnName){
+ this.familyMap.put(pFieldName, pFamilyName);
+ this.columnMap.put(pFieldName, pColumnName);
+ }
+
public String getFamily(String name) {
return this.familyMap.get(name);
}
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java Fri Jun 14 00:55:24 2013
@@ -19,6 +19,7 @@
package org.apache.gora.cassandra.store;
import java.io.IOException;
+import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -48,7 +49,9 @@ public class CassandraMappingManager {
return manager;
}
- //
+ /**
+ * Objects to maintain mapped keyspaces
+ */
private Map<String, Element> keyspaceMap = null;
private Map<String, Element> mappingMap = null;
@@ -95,7 +98,12 @@ public class CassandraMappingManager {
public void loadConfiguration() throws JDOMException, IOException {
SAXBuilder saxBuilder = new SAXBuilder();
// get mapping file
- Document document = saxBuilder.build(getClass().getClassLoader().getResourceAsStream(MAPPING_FILE));
+ InputStream inputStream = getClass().getClassLoader().getResourceAsStream(MAPPING_FILE);
+ if (inputStream == null){
+ LOG.warn("Mapping file '" + MAPPING_FILE + "' could not be found!");
+ throw new IOException("Mapping file '" + MAPPING_FILE + "' could not be found!");
+ }
+ Document document = saxBuilder.build(inputStream);
if (document == null) {
LOG.warn("Mapping file '" + MAPPING_FILE + "' could not be found!");
}
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Fri Jun 14 00:55:24 2013
@@ -29,8 +29,6 @@ import java.util.Properties;
import java.util.Set;
import java.util.Collections;
-import me.prettyprint.cassandra.serializers.IntegerSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.HSuperColumn;
@@ -42,7 +40,6 @@ import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericArray;
-import org.apache.avro.specific.SpecificFixed;
import org.apache.avro.util.Utf8;
import org.apache.gora.cassandra.query.CassandraQuery;
import org.apache.gora.cassandra.query.CassandraResult;
@@ -68,6 +65,11 @@ public class CassandraStore<K, T extends
private CassandraClient<K, T> cassandraClient = new CassandraClient<K, T>();
/**
+ * Default schema index used when AVRO Union data types are stored
+ */
+ public static int DEFAULT_UNION_SCHEMA = 0;
+
+ /**
* The values are Avro fields pending to be stored.
*
* We want to iterate over the keys in insertion order.
@@ -132,7 +134,7 @@ public class CassandraStore<K, T extends
CassandraResult<K, T> cassandraResult = new CassandraResult<K, T>(this, query);
cassandraResult.setReverseMap(reverseMap);
- CassandraResultSet cassandraResultSet = new CassandraResultSet();
+ CassandraResultSet<K> cassandraResultSet = new CassandraResultSet<K>();
// We query Cassandra keyspace by families.
for (String family : familyMap.keySet()) {
@@ -322,6 +324,11 @@ public class CassandraStore<K, T extends
}
fieldValue = newArray;
break;
+ case UNION:
+ // storing the union selected schema, the actual value will be stored as soon as getting out of here
+ // TODO determine which schema we are using: int schemaPos = getUnionSchema(fieldValue,fieldSchema);
+ // and save it p.put( p.getFieldIndex(field.name() + CassandraStore.UNION_COL_SUFIX), schemaPos);
+ break;
}
p.put(fieldPos, fieldValue);
@@ -341,37 +348,32 @@ public class CassandraStore<K, T extends
private void addOrUpdateField(K key, Field field, Object value) {
Schema schema = field.schema();
Type type = schema.getType();
- switch (type) {
- case STRING:
- case BOOLEAN:
- case INT:
- case LONG:
- case BYTES:
- case FLOAT:
- case DOUBLE:
- case FIXED:
- this.cassandraClient.addColumn(key, field.name(), value);
- break;
- case RECORD:
- if (value != null) {
- if (value instanceof PersistentBase) {
- PersistentBase persistentBase = (PersistentBase) value;
- for (Field member: schema.getFields()) {
-
- // TODO: hack, do not store empty arrays
- Object memberValue = persistentBase.get(member.pos());
- if (memberValue instanceof GenericArray<?>) {
- if (((GenericArray)memberValue).size() == 0) {
- continue;
- }
- } else if (memberValue instanceof StatefulHashMap<?,?>) {
- if (((StatefulHashMap)memberValue).size() == 0) {
- continue;
+ switch (type) {
+ case STRING:
+ case BOOLEAN:
+ case INT:
+ case LONG:
+ case BYTES:
+ case FLOAT:
+ case DOUBLE:
+ case FIXED:
+ this.cassandraClient.addColumn(key, field.name(), value);
+ break;
+ case RECORD:
+ if (value != null) {
+ if (value instanceof PersistentBase) {
+ PersistentBase persistentBase = (PersistentBase) value;
+ for (Field member: schema.getFields()) {
+
+ // TODO: hack, do not store empty arrays
+ Object memberValue = persistentBase.get(member.pos());
+ if (memberValue instanceof GenericArray<?>) {
+ if (((GenericArray)memberValue).size() == 0) {
+ continue;
+ }
}
+ this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
}
-
- this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
- }
} else {
LOG.info("Record not supported: " + value.toString());
@@ -396,11 +398,52 @@ public class CassandraStore<K, T extends
}
}
break;
+ case UNION:
+ if(value != null) {
+ LOG.info("Union being supported with value: " + value.toString());
+ // TODO add union schema index used
+ // adding union value
+ this.cassandraClient.addColumn(key, field.name(), value);
+ } else {
+ LOG.info("Union not supported: " + value.toString());
+ }
default:
LOG.info("Type not considered: " + type.name());
}
}
+ /**
+ * Gets the position within the schema of the type used
+ * @param pValue
+ * @param pUnionSchema
+ * @return
+ */
+ private int getUnionSchema(Object pValue, Schema pUnionSchema){
+ int unionSchemaPos = 0;
+ String valueType = pValue.getClass().getSimpleName();
+ Iterator<Schema> it = pUnionSchema.getTypes().iterator();
+ while ( it.hasNext() ){
+ String schemaName = it.next().getName();
+ if (valueType.equals("Utf8") && schemaName.equals(Type.STRING.name().toLowerCase()))
+ return unionSchemaPos;
+ else if (valueType.equals("HeapByteBuffer") && schemaName.equals(Type.STRING.name().toLowerCase()))
+ return unionSchemaPos;
+ else if (valueType.equals("Integer") && schemaName.equals(Type.INT.name().toLowerCase()))
+ return unionSchemaPos;
+ else if (valueType.equals("Long") && schemaName.equals(Type.LONG.name().toLowerCase()))
+ return unionSchemaPos;
+ else if (valueType.equals("Double") && schemaName.equals(Type.DOUBLE.name().toLowerCase()))
+ return unionSchemaPos;
+ else if (valueType.equals("Float") && schemaName.equals(Type.FLOAT.name().toLowerCase()))
+ return unionSchemaPos;
+ else if (valueType.equals("Boolean") && schemaName.equals(Type.BOOLEAN.name().toLowerCase()))
+ return unionSchemaPos;
+ unionSchemaPos ++;
+ }
+ // if we weren't able to determine which data type it is, then we return the default
+ return 0;
+ }
+
@Override
public boolean schemaExists() {
LOG.info("schema exists");
Modified: gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml (original)
+++ gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml Fri Jun 14 00:55:24 2013
@@ -20,19 +20,16 @@
<gora-orm>
<keyspace name="Employee" host="localhost" cluster="Gora Cassandra Test Cluster">
<family name="p"/>
- <family name="f"/>
<family name="sc" type="super" />
</keyspace>
<keyspace name="WebPage" host="localhost" cluster="Gora Cassandra Test Cluster">
<family name="p"/>
- <family name="f"/>
<family name="sc" type="super"/>
</keyspace>
<keyspace name="TokenDatum" host="localhost" cluster="Gora Cassandra Test Cluster">
<family name="p"/>
- <family name="f"/>
<family name="sc" type="super"/>
</keyspace>
@@ -41,6 +38,8 @@
<field name="dateOfBirth" family="p" qualifier="info:db"/>
<field name="ssn" family="p" qualifier="info:sn"/>
<field name="salary" family="p" qualifier="info:sl"/>
+ <field name="boss" family="p" qualifier="info:bs"/>
+ <field name="webpage" family="p" qualifier="info:wp"/>
</class>
<class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" keyspace="WebPage">
Modified: gora/trunk/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java (original)
+++ gora/trunk/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java Fri Jun 14 00:55:24 2013
@@ -17,8 +17,9 @@
*/
/**
- * @author lewismc
- *
+ * Testing class for all standard gora-cassandra functionality.
+ * We extend DataStoreTestBase enabling us to run the entire base test
+ * suite for Gora.
*/
package org.apache.gora.cassandra.store;
@@ -38,7 +39,6 @@ import org.junit.Test;
/**
* Test for CassandraStore.
- * @author lewismc
*/
public class TestCassandraStore extends DataStoreTestBase{
@@ -93,6 +93,14 @@ public class TestCassandraStore extends
public void testDeleteByQueryFields() throws IOException {}
@Override
public void testGetPartitions() throws IOException {}
+ @Override
+ public void testGetRecursive() throws IOException {}
+ @Override
+ public void testGetDoubleRecursive() throws IOException{}
+ @Override
+ public void testGetNested() throws IOException {}
+ @Override
+ public void testGet3UnionField() throws IOException {}
// ============================================================================
Modified: gora/trunk/gora-core/src/examples/avro/employee.json
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/examples/avro/employee.json?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/examples/avro/employee.json (original)
+++ gora/trunk/gora-core/src/examples/avro/employee.json Fri Jun 14 00:55:24 2013
@@ -6,6 +6,29 @@
{"name": "name", "type": "string"},
{"name": "dateOfBirth", "type": "long"},
{"name": "ssn", "type": "string"},
- {"name": "salary", "type": "int"}
+ {"name": "salary", "type": "int"},
+ {"name": "boss", "type":["null","Employee","string"]},
+ {"name": "webpage", "type":["null",
+ {
+ "type": "record",
+ "name": "WebPage",
+ "namespace": "org.apache.gora.examples.generated",
+ "fields" : [
+ {"name": "url", "type": "string"},
+ {"name": "content", "type": ["null","bytes"]},
+ {"name": "parsedContent", "type": {"type":"array", "items": "string"}},
+ {"name": "outlinks", "type": {"type":"map", "values":"string"}},
+ {"name": "metadata", "type": {
+ "name": "Metadata",
+ "type": "record",
+ "namespace": "org.apache.gora.examples.generated",
+ "fields": [
+ {"name": "version", "type": "int"},
+ {"name": "data", "type": {"type": "map", "values": "string"}}
+ ]
+ }}
+ ]
+ }
+ ]}
]
}
Modified: gora/trunk/gora-core/src/examples/avro/webpage.json
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/examples/avro/webpage.json?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/examples/avro/webpage.json (original)
+++ gora/trunk/gora-core/src/examples/avro/webpage.json Fri Jun 14 00:55:24 2013
@@ -4,7 +4,7 @@
"namespace": "org.apache.gora.examples.generated",
"fields" : [
{"name": "url", "type": "string"},
- {"name": "content", "type": "bytes"},
+ {"name": "content", "type": ["null","bytes"]},
{"name": "parsedContent", "type": {"type":"array", "items": "string"}},
{"name": "outlinks", "type": {"type":"map", "values":"string"}},
{"name": "metadata", "type": {
Modified: gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/WebPageDataCreator.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/WebPageDataCreator.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/WebPageDataCreator.java (original)
+++ gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/WebPageDataCreator.java Fri Jun 14 00:55:24 2013
@@ -61,7 +61,7 @@ public class WebPageDataCreator {
}
public static final String[] CONTENTS = {
- "foo baz bar",
+ null,
"foo",
"foo1 bar1 baz1",
"a b c d e",
@@ -116,11 +116,12 @@ public class WebPageDataCreator {
for(int i=0; i<URLS.length; i++) {
page = new WebPage();
page.setUrl(new Utf8(URLS[i]));
- page.setContent(ByteBuffer.wrap(CONTENTS[i].getBytes()));
- for(String token : CONTENTS[i].split(" ")) {
- page.addToParsedContent(new Utf8(token));
- }
-
+ if (CONTENTS[i]!=null){
+ page.setContent(ByteBuffer.wrap(CONTENTS[i].getBytes()));
+ for(String token : CONTENTS[i].split(" ")) {
+ page.addToParsedContent(new Utf8(token));
+ }
+ }
for(int j=0; j<LINKS[i].length; j++) {
page.putToOutlinks(new Utf8(URLS[LINKS[i][j]]), new Utf8(ANCHORS[i][j]));
}
Modified: gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/Employee.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/Employee.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/Employee.java (original)
+++ gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/Employee.java Fri Jun 14 00:55:24 2013
@@ -1,3 +1,21 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the"
+ *License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing, software
+ *distributed under the License is distributed on an "AS IS" BASIS,
+ *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *See the License for the specific language governing permissions and
+ *limitations under the License.
+ */
+
package org.apache.gora.examples.generated;
import java.nio.ByteBuffer;
@@ -10,6 +28,7 @@ import org.apache.avro.Protocol;
import org.apache.avro.util.Utf8;
import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.FixedSize;
import org.apache.avro.specific.SpecificExceptionBase;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.specific.SpecificRecord;
@@ -22,12 +41,14 @@ import org.apache.gora.persistency.ListG
@SuppressWarnings("all")
public class Employee extends PersistentBase {
- public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"Employee\",\"namespace\":\"org.apache.gora.examples.generated\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"dateOfBirth\",\"type\":\"long\"},{\"name\":\"ssn\",\"type\":\"string\"},{\"name\":\"salary\",\"type\":\"int\"}]}");
+ public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"Employee\",\"namespace\":\"org.apache.gora.examples.generated\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"dateOfBirth\",\"type\":\"long\"},{\"name\":\"ssn\",\"type\":\"string\"},{\"name\":\"salary\",\"type\":\"int\"},{\"name\":\"boss\",\"type\":[\"null\",\"Employee\",\"string\"]},{\"name\":\"webpage\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"WebPage\",\"fields\":[{\"name\":\"url\",\"type\":\"string\"},{\"name\":\"content\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"parsedContent\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"outlinks\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"metadata\",\"type\":{\"type\":\"record\",\"name\":\"Metadata\",\"fields\":[{\"name\":\"version\",\"type\":\"int\"},{\"name\":\"data\",\"type\":{\"type\":\"map\",\"values\":\"string\"}}]}}]}]}]}");
public static enum Field {
NAME(0,"name"),
DATE_OF_BIRTH(1,"dateOfBirth"),
SSN(2,"ssn"),
SALARY(3,"salary"),
+ BOSS(4,"boss"),
+ WEBPAGE(5,"webpage"),
;
private int index;
private String name;
@@ -36,7 +57,7 @@ public class Employee extends Persistent
public String getName() {return name;}
public String toString() {return name;}
};
- public static final String[] _ALL_FIELDS = {"name","dateOfBirth","ssn","salary",};
+ public static final String[] _ALL_FIELDS = {"name","dateOfBirth","ssn","salary","boss","webpage",};
static {
PersistentBase.registerFields(Employee.class, _ALL_FIELDS);
}
@@ -44,6 +65,8 @@ public class Employee extends Persistent
private long dateOfBirth;
private Utf8 ssn;
private int salary;
+ private Object boss;
+ private WebPage webpage;
public Employee() {
this(new StateManagerImpl());
}
@@ -60,6 +83,8 @@ public class Employee extends Persistent
case 1: return dateOfBirth;
case 2: return ssn;
case 3: return salary;
+ case 4: return boss;
+ case 5: return webpage;
default: throw new AvroRuntimeException("Bad index");
}
}
@@ -72,6 +97,8 @@ public class Employee extends Persistent
case 1:dateOfBirth = (Long)_value; break;
case 2:ssn = (Utf8)_value; break;
case 3:salary = (Integer)_value; break;
+ case 4:boss = (Object)_value; break;
+ case 5:webpage = (WebPage)_value; break;
default: throw new AvroRuntimeException("Bad index");
}
}
@@ -99,4 +126,19 @@ public class Employee extends Persistent
public void setSalary(int value) {
put(3, value);
}
+ public Object getBoss() {
+ return (Object) get(4);
+ }
+ public void setBoss(Employee value) {
+ put(4, value);
+ }
+ public void setBoss(Utf8 value) {
+ put(4, value);
+ }
+ public WebPage getWebpage() {
+ return (WebPage) get(5);
+ }
+ public void setWebpage(WebPage value) {
+ put(5, value);
+ }
}
Modified: gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/Metadata.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/Metadata.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/Metadata.java (original)
+++ gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/Metadata.java Fri Jun 14 00:55:24 2013
@@ -1,3 +1,21 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the"
+ *License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing, software
+ *distributed under the License is distributed on an "AS IS" BASIS,
+ *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *See the License for the specific language governing permissions and
+ *limitations under the License.
+ */
+
package org.apache.gora.examples.generated;
import java.nio.ByteBuffer;
@@ -10,6 +28,7 @@ import org.apache.avro.Protocol;
import org.apache.avro.util.Utf8;
import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.FixedSize;
import org.apache.avro.specific.SpecificExceptionBase;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.specific.SpecificRecord;
Modified: gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/TokenDatum.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/TokenDatum.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/TokenDatum.java (original)
+++ gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/TokenDatum.java Fri Jun 14 00:55:24 2013
@@ -1,3 +1,21 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the"
+ *License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing, software
+ *distributed under the License is distributed on an "AS IS" BASIS,
+ *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *See the License for the specific language governing permissions and
+ *limitations under the License.
+ */
+
package org.apache.gora.examples.generated;
import java.nio.ByteBuffer;
@@ -10,6 +28,7 @@ import org.apache.avro.Protocol;
import org.apache.avro.util.Utf8;
import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.FixedSize;
import org.apache.avro.specific.SpecificExceptionBase;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.specific.SpecificRecord;
Modified: gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/WebPage.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/WebPage.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/WebPage.java (original)
+++ gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/WebPage.java Fri Jun 14 00:55:24 2013
@@ -1,3 +1,21 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the"
+ *License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing, software
+ *distributed under the License is distributed on an "AS IS" BASIS,
+ *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *See the License for the specific language governing permissions and
+ *limitations under the License.
+ */
+
package org.apache.gora.examples.generated;
import java.nio.ByteBuffer;
@@ -10,6 +28,7 @@ import org.apache.avro.Protocol;
import org.apache.avro.util.Utf8;
import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.FixedSize;
import org.apache.avro.specific.SpecificExceptionBase;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.specific.SpecificRecord;
@@ -22,7 +41,7 @@ import org.apache.gora.persistency.ListG
@SuppressWarnings("all")
public class WebPage extends PersistentBase {
- public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"WebPage\",\"namespace\":\"org.apache.gora.examples.generated\",\"fields\":[{\"name\":\"url\",\"type\":\"string\"},{\"name\":\"content\",\"type\":\"bytes\"},{\"name\":\"parsedContent\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"outlinks\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"metadata\",\"type\":{\"type\":\"record\",\"name\":\"Metadata\",\"fields\":[{\"name\":\"version\",\"type\":\"int\"},{\"name\":\"data\",\"type\":{\"type\":\"map\",\"values\":\"string\"}}]}}]}");
+ public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"WebPage\",\"namespace\":\"org.apache.gora.examples.generated\",\"fields\":[{\"name\":\"url\",\"type\":\"string\"},{\"name\":\"content\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"parsedContent\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"outlinks\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"metadata\",\"type\":{\"type\":\"record\",\"name\":\"Metadata\",\"fields\":[{\"name\":\"version\",\"type\":\"int\"},{\"name\":\"data\",\"type\":{\"type\":\"map\",\"values\":\"string\"}}]}}]}");
public static enum Field {
URL(0,"url"),
CONTENT(1,"content"),
Modified: gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/WordCount.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/WordCount.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/WordCount.java (original)
+++ gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/WordCount.java Fri Jun 14 00:55:24 2013
@@ -65,13 +65,15 @@ public class WordCount extends Configure
protected void map(String key, WebPage page, Context context)
throws IOException ,InterruptedException {
- //Get the content from a WebPage as obtained from the DataStore
- String content = new String(page.getContent().array());
-
- StringTokenizer itr = new StringTokenizer(content);
- while (itr.hasMoreTokens()) {
- word.set(itr.nextToken());
- context.write(word, one);
+ if (page.getContent() != null) {
+ // Get the content from a WebPage as obtained from the DataStore
+ String content = new String(page.getContent().array());
+
+ StringTokenizer itr = new StringTokenizer(content);
+ while (itr.hasMoreTokens()) {
+ word.set(itr.nextToken());
+ context.write(word, one);
+ }
}
};
}
Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/compiler/GoraCompiler.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/compiler/GoraCompiler.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/compiler/GoraCompiler.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/compiler/GoraCompiler.java Fri Jun 14 00:55:24 2013
@@ -389,6 +389,27 @@ public class GoraCompiler {
line(2, "getStateManager().setDirty(this, "+i+");");
line(2, "return "+field.name()+".remove(key);");
line(1, "}");
+ break;
+ case UNION:
+ fieldType = type(fieldSchema);
+ //Create get method: public <unbox(field.schema())> get<camelKey>()
+ line(1, "public "+unbox(field.schema())+" get" +camelKey+"() {");
+ line(2, "return ("+unbox(field.schema())+") get("+i+");");
+ line(1, "}");
+
+ //Create set methods: public void set<camelKey>(<subschema.fieldType> value)
+ for (Schema s : fieldSchema.getTypes()) {
+ if (s.getType().equals(Schema.Type.NULL)) continue ;
+ String unionFieldType = type(s);
+ line(1, "public void set"+camelKey+"("+unionFieldType+" value) {");
+ line(2, "put("+i+", value);");
+ line(1, "}");
+ }
+ break;
+ case NULL:
+ throw new RuntimeException("Unexpected NULL field: "+field);
+ default:
+ throw new RuntimeException("Unknown field: "+field);
}
i++;
}
Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/impl/PersistentBase.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/impl/PersistentBase.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/impl/PersistentBase.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/impl/PersistentBase.java Fri Jun 14 00:55:24 2013
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.specific.SpecificRecord;
import org.apache.gora.avro.PersistentDatumReader;
import org.apache.gora.persistency.ListGenericArray;
@@ -253,12 +254,33 @@ public abstract class PersistentBase imp
return result;
}
+ /**
+ * Computes a (record's) field's hash code.
+ * @param i Index of the field in the actual
+ * @param field
+ * @return
+ */
private int getFieldHashCode(int i, Field field) {
Object o = get(i);
if(o == null)
return 0;
- if(field.schema().getType() == Type.BYTES) {
+ // XXX Union special case: in a field being union we have to check the
+ // inner schemas for Type.BYTES special case, but because it is not a
+ // field we check it this way. Too simple case to create another
+ // private method
+ boolean isUnionField = false ;
+ int unionIndex = -1 ;
+
+ if (field.schema().getType() == Type.UNION) {
+ isUnionField = true ;
+ unionIndex = GenericData.get().resolveUnion(field.schema(), o);
+ }
+
+ if(field.schema().getType() == Type.BYTES
+ || (isUnionField
+ && field.schema().getTypes().get(unionIndex).getType() == Type.BYTES)) {
+ // ByteBuffer.hashCode() depends on internal 'position' index, but we must ignore that.
return getByteBufferHashCode((ByteBuffer)o);
}
Modified: gora/trunk/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java (original)
+++ gora/trunk/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java Fri Jun 14 00:55:24 2013
@@ -78,11 +78,13 @@ public class MapReduceTestUtils {
//assert results
HashMap<String, Integer> actualCounts = new HashMap<String, Integer>();
for(String content : WebPageDataCreator.CONTENTS) {
- for(String token:content.split(" ")) {
- Integer count = actualCounts.get(token);
- if(count == null)
- count = 0;
- actualCounts.put(token, ++count);
+ if (content != null) {
+ for(String token:content.split(" ")) {
+ Integer count = actualCounts.get(token);
+ if(count == null)
+ count = 0;
+ actualCounts.put(token, ++count);
+ }
}
}
for(Map.Entry<String, Integer> entry:actualCounts.entrySet()) {
Modified: gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java (original)
+++ gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java Fri Jun 14 00:55:24 2013
@@ -279,6 +279,54 @@ public abstract class DataStoreTestBase
}
@Test
+ /**
+ * Tests put and get a record with a nested recursive record
+ * Employee with a boss (nested).
+ * @throws IOException
+ * @throws Exception
+ */
+ public void testGetRecursive() throws IOException, Exception {
+ log.info("test method: testGetRecursive") ;
+ DataStoreTestUtil.testGetEmployeeRecursive(employeeStore) ;
+ }
+
+ @Test
+ /**
+ * Tests put and get a record with a double nested recursive record
+ * Employee with a boss (nested).
+ * @throws IOException
+ * @throws Exception
+ */
+ public void testGetDoubleRecursive() throws IOException, Exception {
+ log.info("test method: testGetDoubleRecursive") ;
+ DataStoreTestUtil.testGetEmployeeDoubleRecursive(employeeStore) ;
+ }
+
+ @Test
+ /**
+ * Tests put and get a record with a nested record (not recursive)
+ * The webpage of an Employee
+ * @throws IOException
+ * @throws Exception
+ */
+ public void testGetNested() throws IOException, Exception {
+ log.info("test method: testGetNested") ;
+ DataStoreTestUtil.testGetEmployeeNested(employeeStore) ;
+ }
+
+ @Test
+ /**
+ * Tests put and get a record with a 3 types union, and
+ * having the value of the 3rd type.
+ * @throws IOException
+ * @throws Exception
+ */
+ public void testGet3UnionField() throws IOException, Exception {
+ log.info("test method: testGet3UnionField") ;
+ DataStoreTestUtil.testGetEmployee3UnionField(employeeStore) ;
+ }
+
+ @Test
public void testGetWithFields() throws IOException, Exception {
log.info("test method: testGetWithFields");
DataStoreTestUtil.testGetEmployeeWithFields(employeeStore);
Modified: gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java (original)
+++ gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java Fri Jun 14 00:55:24 2013
@@ -42,8 +42,11 @@ import org.apache.avro.generic.GenericAr
import org.apache.avro.util.Utf8;
import org.apache.gora.examples.WebPageDataCreator;
import org.apache.gora.examples.generated.Employee;
+import org.apache.gora.examples.generated.Metadata;
import org.apache.gora.examples.generated.WebPage;
+import org.apache.gora.persistency.BeanFactory;
import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.BeanFactoryImpl;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
@@ -91,6 +94,17 @@ public class DataStoreTestUtil {
return employee;
}
+ public static <K> Employee createBoss(
+ DataStore<K, Employee> dataStore) throws IOException, Exception {
+
+ Employee employee = dataStore.newPersistent();
+ employee.setName(new Utf8("Random boss"));
+ employee.setDateOfBirth( System.currentTimeMillis() - 22L * YEAR_IN_MS );
+ employee.setSalary(1000000);
+ employee.setSsn(new Utf8("202020202020"));
+ return employee;
+ }
+
public static void testAutoCreateSchema(DataStore<String,Employee> dataStore)
throws IOException, Exception {
//should not throw exception
@@ -147,6 +161,76 @@ public class DataStoreTestUtil {
Assert.assertEquals(employee, after);
}
+
+ public static void testGetEmployeeRecursive(DataStore<String, Employee> dataStore)
+ throws IOException, Exception {
+
+ Employee employee = DataStoreTestUtil.createEmployee(dataStore);
+ Employee boss = DataStoreTestUtil.createBoss(dataStore);
+ employee.setBoss(boss) ;
+
+ String ssn = employee.getSsn().toString();
+ dataStore.put(ssn, employee);
+ dataStore.flush();
+ Employee after = dataStore.get(ssn, Employee._ALL_FIELDS);
+ Assert.assertEquals(employee, after);
+ Assert.assertEquals(boss, after.getBoss()) ;
+ }
+
+ public static void testGetEmployeeDoubleRecursive(DataStore<String, Employee> dataStore)
+ throws IOException, Exception {
+
+ Employee employee = DataStoreTestUtil.createEmployee(dataStore);
+ Employee boss = DataStoreTestUtil.createBoss(dataStore);
+ Employee uberBoss = DataStoreTestUtil.createBoss(dataStore);
+ uberBoss.setName(new Utf8("Ãberboss")) ;
+ boss.setBoss(uberBoss) ;
+ employee.setBoss(boss) ;
+
+ String ssn = employee.getSsn().toString();
+ dataStore.put(ssn, employee);
+ dataStore.flush();
+ Employee after = dataStore.get(ssn, Employee._ALL_FIELDS);
+ Assert.assertEquals(employee, after);
+ Assert.assertEquals(boss, after.getBoss()) ;
+ Assert.assertEquals(uberBoss, ((Employee)after.getBoss()).getBoss()) ;
+ }
+
+ public static void testGetEmployeeNested(DataStore<String, Employee> dataStore)
+ throws IOException, Exception {
+
+ Employee employee = DataStoreTestUtil.createEmployee(dataStore);
+ WebPage webpage = new BeanFactoryImpl<String,WebPage>(String.class,WebPage.class).newPersistent() ;
+
+ webpage.setUrl(new Utf8("url..")) ;
+ webpage.setContent(ByteBuffer.wrap("test content".getBytes())) ;
+ Metadata metadata = new BeanFactoryImpl<String,Metadata>(String.class,Metadata.class).newPersistent() ;
+ webpage.setMetadata(metadata) ;
+ employee.setWebpage(webpage) ;
+
+ String ssn = employee.getSsn().toString();
+
+ dataStore.put(ssn, employee);
+ dataStore.flush();
+ Employee after = dataStore.get(ssn, Employee._ALL_FIELDS);
+ Assert.assertEquals(employee, after);
+ Assert.assertEquals(webpage, after.getWebpage()) ;
+ }
+
+ public static void testGetEmployee3UnionField(DataStore<String, Employee> dataStore)
+ throws IOException, Exception {
+
+ Employee employee = DataStoreTestUtil.createEmployee(dataStore);
+ employee.setBoss(new Utf8("Real boss")) ;
+
+ String ssn = employee.getSsn().toString();
+ dataStore.put(ssn, employee);
+ dataStore.flush();
+ Employee after = dataStore.get(ssn, Employee._ALL_FIELDS);
+ Assert.assertEquals(employee, after);
+ Assert.assertEquals("Real boss", ((Utf8)after.getBoss()).toString()) ;
+ }
+
public static void testGetEmployeeNonExisting(DataStore<String, Employee> dataStore)
throws IOException, Exception {
Employee employee = dataStore.get("_NON_EXISTING_SSN_FOR_EMPLOYEE_");
@@ -160,7 +244,14 @@ public class DataStoreTestUtil {
dataStore.put(ssn, employee);
dataStore.flush();
- String[] fields = employee.getFields();
+ // XXX See GORA-216: special case until later reviewed.
+ // Like in K-V stores, if retrieved column does not exists ([webpage] case),
+ // get() must return 'null'.
+ // We prepare an actual weird synthetic test.
+
+ // String[] fields = employee.getFields();
+ String[] fields = {"name","dateOfBirth","ssn","salary"} ;
+
for(Set<String> subset : StringUtils.powerset(fields)) {
if(subset.isEmpty())
continue;
@@ -171,7 +262,7 @@ public class DataStoreTestUtil {
expected.put(index, employee.get(index));
}
- Assert.assertEquals(expected, after);
+ Assert.assertEquals(expected, after);
}
}
@@ -336,27 +427,35 @@ public class DataStoreTestUtil {
Assert.assertNotNull(page);
Assert.assertEquals(URLS[i], page.getUrl().toString());
- Assert.assertTrue("content error:" + new String( toByteArray(page.getContent()) ) +
+ // 'content' is optional
+ if (page.getContent() != null) {
+ Assert.assertTrue("content error:" + new String( toByteArray(page.getContent()) ) +
" actual=" + CONTENTS[i] + " i=" + i
- , Arrays.equals( toByteArray(page.getContent() )
+ , Arrays.equals( toByteArray(page.getContent() )
, CONTENTS[i].getBytes()));
-
- GenericArray<Utf8> parsedContent = page.getParsedContent();
- Assert.assertNotNull(parsedContent);
- Assert.assertTrue(parsedContent.size() > 0);
-
- int j=0;
- String[] tokens = CONTENTS[i].split(" ");
- for(Utf8 token : parsedContent) {
- Assert.assertEquals(tokens[j++], token.toString());
+ GenericArray<Utf8> parsedContent = page.getParsedContent();
+ Assert.assertNotNull(parsedContent);
+ Assert.assertTrue(parsedContent.size() > 0);
+
+ int j=0;
+ String[] tokens = CONTENTS[i].split(" ");
+ for(Utf8 token : parsedContent) {
+ Assert.assertEquals(tokens[j++], token.toString());
+ }
+ } else {
+ // when page.getContent() is null
+ Assert.assertTrue(CONTENTS[i] == null) ;
+ GenericArray<Utf8> parsedContent = page.getParsedContent();
+ Assert.assertNotNull(parsedContent);
+ Assert.assertTrue(parsedContent.size() == 0);
}
if(LINKS[i].length > 0) {
Assert.assertNotNull(page.getOutlinks());
Assert.assertTrue(page.getOutlinks().size() > 0);
- for(j=0; j<LINKS[i].length; j++) {
- Assert.assertEquals(ANCHORS[i][j],
- page.getFromOutlinks(new Utf8(URLS[LINKS[i][j]])).toString());
+ for(int k=0; k<LINKS[i].length; k++) {
+ Assert.assertEquals(ANCHORS[i][k],
+ page.getFromOutlinks(new Utf8(URLS[LINKS[i][k]])).toString());
}
} else {
Assert.assertTrue(page.getOutlinks() == null || page.getOutlinks().isEmpty());
Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java Fri Jun 14 00:55:24 2013
@@ -192,7 +192,7 @@ implements Configurable {
Get get = new Get(toBytes(key));
addFields(get, fields);
Result result = table.get(get);
- return newInstance(result, fields);
+ return newInstance(result, fields);
} catch(IOException ex2){
LOG.error(ex2.getMessage());
LOG.error(ex2.getStackTrace().toString());
@@ -200,6 +200,17 @@ implements Configurable {
}
}
+ /**
+ * {@inheritDoc}
+ * Serializes the Persistent data and saves in HBase.
+ * Topmost fields of the record are persisted in "raw" format (not avro serialized). This behavior happens
+ * in maps and arrays too.
+ *
+ * ["null","type"] type (a.k.a. optional field) is persisted like as if it is ["type"], but the column get
+ * deleted if value==null (so value read after will be null).
+ *
+ * @param persistent Record to be persisted in HBase
+ */
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void put(K key, T persistent) {
@@ -234,8 +245,14 @@ implements Configurable {
case DIRTY:
byte[] qual = Bytes.toBytes(mapKey.toString());
byte[] val = toBytes(map.get(mapKey), field.schema().getValueType());
- put.add(hcol.getFamily(), qual, val);
- hasPuts = true;
+ // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
+ if (val == null) { // value == null => must delete the column
+ delete.deleteColumn(hcol.getFamily(), qual);
+ hasDeletes = true;
+ } else {
+ put.add(hcol.getFamily(), qual, val);
+ hasPuts = true;
+ }
break;
case DELETED:
qual = Bytes.toBytes(mapKey.toString());
@@ -248,9 +265,15 @@ implements Configurable {
Set<Map.Entry> set = ((Map)o).entrySet();
for(Entry entry: set) {
byte[] qual = toBytes(entry.getKey());
- byte[] val = toBytes(entry.getValue());
- put.add(hcol.getFamily(), qual, val);
- hasPuts = true;
+ byte[] val = toBytes(entry.getValue(), field.schema().getValueType());
+ // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
+ if (val == null) { // value == null => must delete the column
+ delete.deleteColumn(hcol.getFamily(), qual);
+ hasDeletes = true;
+ } else {
+ put.add(hcol.getFamily(), qual, val);
+ hasPuts = true;
+ }
}
}
break;
@@ -259,15 +282,28 @@ implements Configurable {
GenericArray arr = (GenericArray) o;
int j=0;
for(Object item : arr) {
- byte[] val = toBytes(item);
- put.add(hcol.getFamily(), Bytes.toBytes(j++), val);
- hasPuts = true;
+ byte[] val = toBytes(item, field.schema().getElementType());
+ // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
+ if (val == null) { // value == null => must delete the column
+ delete.deleteColumn(hcol.getFamily(), Bytes.toBytes(j++));
+ hasDeletes = true;
+ } else {
+ put.add(hcol.getFamily(), Bytes.toBytes(j++), val);
+ hasPuts = true;
+ }
}
}
break;
default:
- put.add(hcol.getFamily(), hcol.getQualifier(), toBytes(o, field.schema()));
- hasPuts = true;
+ // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
+ byte[] serializedBytes = toBytes(o, field.schema()) ;
+ if (serializedBytes == null) { // value == null => must delete the column
+ delete.deleteColumn(hcol.getFamily(), hcol.getQualifier());
+ hasDeletes = true;
+ } else {
+ put.add(hcol.getFamily(), hcol.getQualifier(), serializedBytes);
+ hasPuts = true;
+ }
break;
}
}
@@ -511,6 +547,14 @@ implements Configurable {
}
@SuppressWarnings({ "unchecked", "rawtypes" })
+ /**
+ * Creates a new Persistent instance with the values in 'result' for the fields listed.
+ * @param result result form a HTable#get()
+ * @param fields List of fields queried, or null for all
+ * @return A new instance with default values for not listed fields
+ * null if 'result' is null.
+ * @throws IOException
+ */
public T newInstance(Result result, String[] fields)
throws IOException {
if(result == null || result.isEmpty())
@@ -555,8 +599,7 @@ implements Configurable {
setField(persistent, field, arr);
break;
default:
- byte[] val =
- result.getValue(col.getFamily(), col.getQualifier());
+ byte[] val = result.getValue(col.getFamily(), col.getQualifier());
if (val == null) {
continue;
}
Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java Fri Jun 14 00:55:24 2013
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
@@ -34,6 +35,8 @@ import org.apache.avro.specific.Specific
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8;
import org.apache.gora.util.AvroUtils;
+import org.apache.gora.avro.PersistentDatumReader;
+import org.apache.gora.avro.PersistentDatumWriter;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -86,7 +89,17 @@ public class HBaseByteInterface {
};
};
-
+ /**
+ * Deserializes an array of bytes matching the given schema to the proper basic (enum, Utf8,...) or
+ * complex type (Persistent/Record).
+ *
+ * Does not handle <code>arrays/maps</code> if not inside a <code>record</code> type.
+ *
+ * @param schema Avro schema describing the expected data
+ * @param val array of bytes with the data serialized
+ * @return Enum|Utf8|ByteBuffer|Integer|Long|Float|Double|Boolean|Persistent|Null
+ * @throws IOException
+ */
@SuppressWarnings("rawtypes")
public static Object fromBytes(Schema schema, byte[] val) throws IOException {
Type type = schema.getType();
@@ -99,12 +112,57 @@ public class HBaseByteInterface {
case FLOAT: return Bytes.toFloat(val);
case DOUBLE: return Bytes.toDouble(val);
case BOOLEAN: return val[0] != 0;
+ case UNION:
+ // XXX Special case: When reading the top-level field of a record we must handle the
+ // special case ["null","type"] definitions: this will be written as if it was ["type"]
+ // if not in a special case, will execute "case RECORD".
+
+ // if 'val' is empty we ignore the special case (will match Null in "case RECORD")
+ if (schema.getTypes().size() == 2) {
+
+ // schema [type0, type1]
+ Type type0 = schema.getTypes().get(0).getType() ;
+ Type type1 = schema.getTypes().get(1).getType() ;
+
+ // Check if types are different and there's a "null", like ["null","type"] or ["type","null"]
+ if (!type0.equals(type1)
+ && ( type0.equals(Schema.Type.NULL)
+ || type1.equals(Schema.Type.NULL))) {
+
+ if (type0.equals(Schema.Type.NULL))
+ schema = schema.getTypes().get(1) ;
+ else
+ schema = schema.getTypes().get(0) ;
+
+ return fromBytes(schema, val) ; // Deserialize as if schema was ["type"]
+ }
+
+ }
+ // else
+ // type = [type0,type1] where type0=type1
+ // or val == null
+ // => deserialize like "case RECORD"
+
case RECORD:
Map<String, SpecificDatumReader<?>> readerMap = readerMaps.get();
- SpecificDatumReader<?> reader = readerMap.get(schema.getFullName());
- if (reader == null) {
- reader = new SpecificDatumReader(schema);
- readerMap.put(schema.getFullName(), reader);
+ PersistentDatumReader<?> reader = null ;
+
+ // For UNION schemas, must use a specific PersistentDatumReader
+ // from the readerMap since unions don't have own name
+ // (key name in map will be "UNION-type-type-...")
+ if (schema.getType().equals(Schema.Type.UNION)) {
+ reader = (PersistentDatumReader<?>)readerMap.get(String.valueOf(schema.hashCode()));
+ if (reader == null) {
+ reader = new PersistentDatumReader(schema, false);// ignore dirty bits
+ readerMap.put(String.valueOf(schema.hashCode()), reader);
+ }
+ } else {
+ // ELSE use reader for Record
+ reader = (PersistentDatumReader<?>)readerMap.get(schema.getFullName());
+ if (reader == null) {
+ reader = new PersistentDatumReader(schema, false);// ignore dirty bits
+ readerMap.put(schema.getFullName(), reader);
+ }
}
// initialize a decoder, possibly reusing previous one
@@ -116,11 +174,18 @@ public class HBaseByteInterface {
decoders.set(decoder);
}
- return reader.read(null, decoder);
+ return reader.read((Object)null, schema, decoder);
default: throw new RuntimeException("Unknown type: "+type);
}
}
+ /**
+ * Converts an array of bytes to the target <em>basic class</em>.
+ * @param clazz (Byte|Boolean|Short|Integer|Long|Float|Double|String|Utf8).class
+ * @param val array of bytes with the value
+ * @return an instance of <code>clazz</code> with the bytes in <code>val</code>
+ * deserialized with org.apache.hadoop.hbase.util.Bytes
+ */
@SuppressWarnings("unchecked")
public static <K> K fromBytes(Class<K> clazz, byte[] val) {
if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
@@ -145,6 +210,11 @@ public class HBaseByteInterface {
throw new RuntimeException("Can't parse data as class: " + clazz);
}
+ /**
+ * Converts an instance of a <em>basic class</em> to an array of bytes.
+ * @param o Instance of Enum|Byte|Boolean|Short|Integer|Long|Float|Double|String|Utf8
+ * @return array of bytes with <code>o</code> serialized with org.apache.hadoop.hbase.util.Bytes
+ */
public static byte[] toBytes(Object o) {
Class<?> clazz = o.getClass();
if (clazz.equals(Enum.class)) {
@@ -171,6 +241,14 @@ public class HBaseByteInterface {
throw new RuntimeException("Can't parse data as class: " + clazz);
}
+ /**
+ * Serializes an object following the given schema.
+ * Does not handle <code>array/map</code> if it is not inside a <code>record</code>
+ * @param o Utf8|ByteBuffer|Integer|Long|Float|Double|Boolean|Enum|Persistent
+ * @param schema The schema describing the object (or a compatible description)
+ * @return array of bytes of the serialized object
+ * @throws IOException
+ */
@SuppressWarnings({ "rawtypes", "unchecked" })
public static byte[] toBytes(Object o, Schema schema) throws IOException {
Type type = schema.getType();
@@ -183,12 +261,54 @@ public class HBaseByteInterface {
case DOUBLE: return Bytes.toBytes((Double)o);
case BOOLEAN: return (Boolean)o ? new byte[] {1} : new byte[] {0};
case ENUM: return new byte[] { (byte)((Enum<?>) o).ordinal() };
+ case UNION:
+ // XXX Special case: When writing the top-level field of a record we must handle the
+ // special case ["null","type"] definitions: this will be written as if it was ["type"]
+ // if not in a special case, will execute "case RECORD".
+
+ if (schema.getTypes().size() == 2) {
+
+ // schema [type0, type1]
+ Type type0 = schema.getTypes().get(0).getType() ;
+ Type type1 = schema.getTypes().get(1).getType() ;
+
+ // Check if types are different and there's a "null", like ["null","type"] or ["type","null"]
+ if (!type0.equals(type1)
+ && ( type0.equals(Schema.Type.NULL)
+ || type1.equals(Schema.Type.NULL))) {
+
+ if (o == null) return null ;
+
+ int index = GenericData.get().resolveUnion(schema, o);
+ schema = schema.getTypes().get(index) ;
+
+ return toBytes(o, schema) ; // Serialize as if schema was ["type"]
+ }
+
+ }
+ // else
+ // type = [type0,type1] where type0=type1
+ // => Serialize like "case RECORD" with Avro
+
case RECORD:
Map<String, SpecificDatumWriter<?>> writerMap = writerMaps.get();
- SpecificDatumWriter writer = writerMap.get(schema.getFullName());
- if (writer == null) {
- writer = new SpecificDatumWriter(schema);
- writerMap.put(schema.getFullName(),writer);
+ PersistentDatumWriter writer = null ;
+ // For UNION schemas, must use a specific PersistentDatumReader
+ // from the readerMap since unions don't have own name
+ // (key name in map will be "UNION-type-type-...")
+ if (schema.getType().equals(Schema.Type.UNION)) {
+ writer = (PersistentDatumWriter<?>) writerMap.get(String.valueOf(schema.hashCode()));
+ if (writer == null) {
+ writer = new PersistentDatumWriter(schema,false);// ignore dirty bits
+ writerMap.put(String.valueOf(schema.hashCode()),writer);
+ }
+ } else {
+ // ELSE use writer for Record
+ writer = (PersistentDatumWriter<?>) writerMap.get(schema.getFullName());
+ if (writer == null) {
+ writer = new PersistentDatumWriter(schema,false);// ignore dirty bits
+ writerMap.put(schema.getFullName(),writer);
+ }
}
BinaryEncoderWithStream encoder = encoders.get();
@@ -200,7 +320,7 @@ public class HBaseByteInterface {
ByteArrayOutputStream os = (ByteArrayOutputStream) encoder.getOut();
os.reset();
- writer.write(o, encoder);
+ writer.write(schema,o, encoder);
encoder.flush();
return os.toByteArray();
default: throw new RuntimeException("Unknown type: "+type);
Modified: gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml (original)
+++ gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml Fri Jun 14 00:55:24 2013
@@ -35,6 +35,8 @@
<field name="dateOfBirth" family="info" qualifier="db"/>
<field name="ssn" family="info" qualifier="sn"/>
<field name="salary" family="info" qualifier="sl"/>
+ <field name="boss" family="info" qualifier="bs"/>
+ <field name="webpage" family="info" qualifier="wp"/>
</class>
<class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" table="WebPage">
Modified: gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java (original)
+++ gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java Fri Jun 14 00:55:24 2013
@@ -19,21 +19,29 @@
package org.apache.gora.hbase.store;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Properties;
import junit.framework.Assert;
+import org.apache.avro.util.Utf8;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.gora.examples.WebPageDataCreator;
import org.apache.gora.examples.generated.Employee;
import org.apache.gora.examples.generated.WebPage;
import org.apache.gora.hbase.GoraHBaseTestDriver;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.store.DataStoreTestBase;
+import org.apache.gora.store.DataStoreTestUtil;
+import org.apache.gora.util.GoraException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
/**
* Test case for HBaseStore.
@@ -94,8 +102,14 @@ public class TestHBaseStore extends Data
}
+ /**
+ * Asserts that writing bytes actually works at low level in HBase.
+ * Checks writing null unions too.
+ */
@Override
public void assertPutBytes(byte[] contentBytes) throws IOException {
+
+ // Check first the parameter "contentBytes" if written+read right.
HTable table = new HTable("WebPage");
Get get = new Get(Bytes.toBytes("com.example/http"));
org.apache.hadoop.hbase.client.Result result = table.get(get);
@@ -103,7 +117,99 @@ public class TestHBaseStore extends Data
byte[] actualBytes = result.getValue(Bytes.toBytes("content"), null);
Assert.assertNotNull(actualBytes);
Assert.assertTrue(Arrays.equals(contentBytes, actualBytes));
+ table.close();
+
+ // Since "content" is an optional field, we are forced to reopen the DataStore
+ // to retrieve the union correctly
+
+ // Test writing+reading a null value. FIELD in HBASE MUST become DELETED
+ WebPage page = webPageStore.get("com.example/http") ;
+ page.setContent(null) ;
+ webPageStore.put("com.example/http", page) ;
+ webPageStore.close() ;
+ webPageStore = testDriver.createDataStore(String.class, WebPage.class);
+ page = webPageStore.get("com.example/http") ;
+ Assert.assertNull(page.getContent()) ;
+ // Check directly with HBase
+ table = new HTable("WebPage");
+ get = new Get(Bytes.toBytes("com.example/http"));
+ result = table.get(get);
+ actualBytes = result.getValue(Bytes.toBytes("content"), null);
+ Assert.assertNull(actualBytes);
+ table.close();
+
+ // Test writing+reading an empty bytes field. FIELD in HBASE MUST become EMPTY (byte[0])
+ page = webPageStore.get("com.example/http") ;
+ page.setContent(ByteBuffer.wrap("".getBytes())) ;
+ webPageStore.put("com.example/http", page) ;
+ webPageStore.close() ;
+ webPageStore = testDriver.createDataStore(String.class, WebPage.class);
+ page = webPageStore.get("com.example/http") ;
+ Assert.assertTrue(Arrays.equals("".getBytes(),page.getContent().array())) ;
+ // Check directly with HBase
+ table = new HTable("WebPage");
+ get = new Get(Bytes.toBytes("com.example/http"));
+ result = table.get(get);
+ actualBytes = result.getValue(Bytes.toBytes("content"), null);
+ Assert.assertNotNull(actualBytes);
+ Assert.assertEquals(0, actualBytes.length) ;
table.close();
+
+ }
+
+ /**
+ * Checks that when writing a top level union <code>['null','type']</code> the value is written in raw format
+ * @throws Exception
+ */
+ @Test
+ public void assertTopLevelUnions() throws Exception {
+ WebPage page = webPageStore.newPersistent();
+
+ // Write webpage data
+ page.setUrl(new Utf8("http://example.com"));
+ byte[] contentBytes = "example content in example.com".getBytes();
+ ByteBuffer buff = ByteBuffer.wrap(contentBytes);
+ page.setContent(buff);
+ webPageStore.put("com.example/http", page);
+ webPageStore.flush() ;
+
+ // Read directly from HBase
+ HTable table = new HTable("WebPage");
+ Get get = new Get(Bytes.toBytes("com.example/http"));
+ org.apache.hadoop.hbase.client.Result result = table.get(get);
+
+ byte[] bytesRead = result.getValue(Bytes.toBytes("content"), null);
+
+ Assert.assertNotNull(bytesRead) ;
+ Assert.assertTrue(Arrays.equals(bytesRead, contentBytes));
+ }
+
+ /**
+ * Checks that when writing a top level union <code>['null','type']</code> with the option <code>RAW_ROOT_FIELDS_OPTION=true</code>
+ * the column is not created, and when <code>RAW_ROOT_FIELDS_OPTION=false</code> the <code>null</code> value is serialized
+ * with Avro.
+ * @throws Exception
+ */
+ @Test
+ public void assertTopLevelUnionsNull() throws Exception {
+ WebPage page = webPageStore.newPersistent();
+
+ // Write webpage data
+ page.setUrl(new Utf8("http://example.com"));
+ page.setContent(null); // This won't change internal field status to dirty, so
+ page.setDirty("content") ; // need to change it manually
+ webPageStore.put("com.example/http", page);
+ webPageStore.flush() ;
+
+ // Read directly from HBase
+ HTable table = new HTable("WebPage");
+ Get get = new Get(Bytes.toBytes("com.example/http"));
+ org.apache.hadoop.hbase.client.Result result = table.get(get);
+
+ byte[] contentBytes = result.getValue(Bytes.toBytes("content"), null);
+
+ Assert.assertNull(webPageStore.get("com.example/http", new String[]{"content"})) ;
+ Assert.assertTrue(contentBytes == null || contentBytes.length == 0) ;
}
@Override