You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2014/06/04 18:36:23 UTC
[06/50] [abbrv] GORA-321. Merge GORA_94 into Gora trunk
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/gora-hbase/pom.xml b/gora-hbase/pom.xml
index d48cb32..2b3fd2b 100644
--- a/gora-hbase/pom.xml
+++ b/gora-hbase/pom.xml
@@ -100,6 +100,7 @@
<dependency>
<groupId>org.apache.gora</groupId>
<artifactId>gora-core</artifactId>
+ <scope>compile</scope>
</dependency>
<dependency>
@@ -108,45 +109,49 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <!-- END of Gora Internal Dependencies -->
- <!-- Hadoop Dependencies -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
+ <scope>compile</scope>
</dependency>
<dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase</artifactId>
- <type>test-jar</type>
- </dependency>
-
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
+ <groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
+ <scope>compile</scope>
</dependency>
<!-- Misc Dependencies -->
<dependency>
<groupId>org.jdom</groupId>
<artifactId>jdom</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>compile</scope>
</dependency>
<!-- Logging Dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
+ <scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>javax.jms</groupId>
@@ -154,18 +159,28 @@
</exclusion>
</exclusions>
</dependency>
+ <!-- END of Logging Dependencies -->
<!-- Testing Dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-test</artifactId>
+ <scope>test</scope>
</dependency>
-
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <!-- END of Testing Dependencies -->
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java b/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java
index 5c876ad..ddd13aa 100644
--- a/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java
+++ b/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java
@@ -21,7 +21,6 @@ package org.apache.gora.hbase.query;
import java.io.IOException;
import org.apache.gora.hbase.store.HBaseStore;
-import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.Query;
import org.apache.hadoop.hbase.client.Get;
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java b/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java
index 0189954..3c06267 100644
--- a/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java
+++ b/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java
@@ -23,7 +23,6 @@ import static org.apache.gora.hbase.util.HBaseByteInterface.fromBytes;
import java.io.IOException;
import org.apache.gora.hbase.store.HBaseStore;
-import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.Query;
import org.apache.gora.query.impl.ResultBase;
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
index d07ff05..92f0591 100644
--- a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
+++ b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
@@ -25,7 +25,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -36,21 +35,15 @@ import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericArray;
import org.apache.avro.util.Utf8;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.gora.hbase.query.HBaseGetResult;
import org.apache.gora.hbase.query.HBaseQuery;
import org.apache.gora.hbase.query.HBaseScannerResult;
import org.apache.gora.hbase.store.HBaseMapping.HBaseMappingBuilder;
import org.apache.gora.hbase.util.HBaseByteInterface;
import org.apache.gora.hbase.util.HBaseFilterUtil;
-import org.apache.gora.persistency.ListGenericArray;
-import org.apache.gora.persistency.State;
-import org.apache.gora.persistency.StateManager;
-import org.apache.gora.persistency.StatefulHashMap;
-import org.apache.gora.persistency.StatefulMap;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
+import org.apache.gora.persistency.impl.DirtyMapWrapper;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
@@ -74,6 +67,8 @@ import org.apache.hadoop.hbase.util.Pair;
import org.jdom.Document;
import org.jdom.Element;
import org.jdom.input.SAXBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* DataStore for HBase. Thread safe.
@@ -135,7 +130,7 @@ implements Configurable {
} catch (Exception e) {
throw new RuntimeException(e);
}
-
+
// Set scanner caching option
try {
this.setScannerCaching(
@@ -146,7 +141,7 @@ implements Configurable {
LOG.error("Can not load " + SCANNER_CACHING_PROPERTIES_KEY + " from gora.properties. Setting to default value: " + SCANNER_CACHING_PROPERTIES_DEFAULT, e) ;
this.setScannerCaching(SCANNER_CACHING_PROPERTIES_DEFAULT) ; // Default value if something is wrong
}
-
+
if(autoCreateSchema) {
createSchema();
}
@@ -225,126 +220,117 @@ implements Configurable {
}
/**
- * {@inheritDoc}
- * Serializes the Persistent data and saves in HBase.
- * Topmost fields of the record are persisted in "raw" format (not avro serialized). This behavior happens
- * in maps and arrays too.
+ * {@inheritDoc} Serializes the Persistent data and saves in HBase. Topmost
+ * fields of the record are persisted in "raw" format (not avro serialized).
+ * This behavior happens in maps and arrays too.
*
- * ["null","type"] type (a.k.a. optional field) is persisted like as if it is ["type"], but the column get
- * deleted if value==null (so value read after will be null).
+ * ["null","type"] type (a.k.a. optional field) is persisted like as if it is
+ * ["type"], but the column get deleted if value==null (so value read after
+ * will be null).
*
- * @param persistent Record to be persisted in HBase
+ * @param persistent
+ * Record to be persisted in HBase
*/
- @SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void put(K key, T persistent) {
- try{
+ try {
Schema schema = persistent.getSchema();
- StateManager stateManager = persistent.getStateManager();
byte[] keyRaw = toBytes(key);
Put put = new Put(keyRaw);
Delete delete = new Delete(keyRaw);
- boolean hasPuts = false;
- boolean hasDeletes = false;
- Iterator<Field> iter = schema.getFields().iterator();
- for (int i = 0; iter.hasNext(); i++) {
- Field field = iter.next();
- if (!stateManager.isDirty(persistent, i)) {
+ List<Field> fields = schema.getFields();
+ for (int i = 1; i < fields.size(); i++) {
+ if (!persistent.isDirty(i)) {
continue;
}
- Type type = field.schema().getType();
+ Field field = fields.get(i);
Object o = persistent.get(i);
HBaseColumn hcol = mapping.getColumn(field.name());
if (hcol == null) {
- throw new RuntimeException("HBase mapping for field ["+ persistent.getClass().getName() +
- "#"+ field.name()+"] not found. Wrong gora-hbase-mapping.xml?");
- }
- switch(type) {
- case MAP:
- if(o instanceof StatefulMap) {
- StatefulHashMap<Utf8, ?> map = (StatefulHashMap<Utf8, ?>) o;
- for (Entry<Utf8, State> e : map.states().entrySet()) {
- Utf8 mapKey = e.getKey();
- switch (e.getValue()) {
- case DIRTY:
- byte[] qual = Bytes.toBytes(mapKey.toString());
- byte[] val = toBytes(map.get(mapKey), field.schema().getValueType());
- // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
- if (val == null) { // value == null => must delete the column
- delete.deleteColumn(hcol.getFamily(), qual);
- hasDeletes = true;
- } else {
- put.add(hcol.getFamily(), qual, val);
- hasPuts = true;
- }
- break;
- case DELETED:
- qual = Bytes.toBytes(mapKey.toString());
- hasDeletes = true;
- delete.deleteColumn(hcol.getFamily(), qual);
- break;
- default :
- break;
- }
- }
- } else {
- Set<Map.Entry> set = ((Map)o).entrySet();
- for(Entry entry: set) {
- byte[] qual = toBytes(entry.getKey());
- byte[] val = toBytes(entry.getValue(), field.schema().getValueType());
- // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
- if (val == null) { // value == null => must delete the column
- delete.deleteColumn(hcol.getFamily(), qual);
- hasDeletes = true;
- } else {
- put.add(hcol.getFamily(), qual, val);
- hasPuts = true;
- }
- }
- }
- break;
- case ARRAY:
- if(o instanceof GenericArray) {
- GenericArray arr = (GenericArray) o;
- int j=0;
- for(Object item : arr) {
- byte[] val = toBytes(item, field.schema().getElementType());
- // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
- if (val == null) { // value == null => must delete the column
- delete.deleteColumn(hcol.getFamily(), Bytes.toBytes(j++));
- hasDeletes = true;
- } else {
- put.add(hcol.getFamily(), Bytes.toBytes(j++), val);
- hasPuts = true;
- }
- }
- }
- break;
- default:
- // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
- byte[] serializedBytes = toBytes(o, field.schema()) ;
- if (serializedBytes == null) { // value == null => must delete the column
- delete.deleteColumn(hcol.getFamily(), hcol.getQualifier());
- hasDeletes = true;
- } else {
- put.add(hcol.getFamily(), hcol.getQualifier(), serializedBytes);
- hasPuts = true;
- }
- break;
+ throw new RuntimeException("HBase mapping for field ["
+ + persistent.getClass().getName() + "#" + field.name()
+ + "] not found. Wrong gora-hbase-mapping.xml?");
}
+ addPutsAndDeletes(put, delete, o, field.schema().getType(),
+ field.schema(), hcol, hcol.getQualifier());
}
- if (hasPuts) {
+ if (put.size() > 0) {
table.put(put);
}
- if (hasDeletes) {
+ if (delete.size() > 0) {
table.delete(delete);
+ table.delete(delete);
+ table.delete(delete); // HBase sometimes does not delete arbitrarily
}
- } catch(IOException ex2){
+ } catch (IOException ex2) {
LOG.error(ex2.getMessage());
LOG.error(ex2.getStackTrace().toString());
}
}
+ private void addPutsAndDeletes(Put put, Delete delete, Object o, Type type,
+ Schema schema, HBaseColumn hcol, byte[] qualifier) throws IOException {
+ switch (type) {
+ case UNION:
+ if (isNullable(schema) && o == null) {
+ if (qualifier == null) {
+ delete.deleteFamily(hcol.getFamily());
+ } else {
+ delete.deleteColumn(hcol.getFamily(), qualifier);
+ }
+ } else {
+// int index = GenericData.get().resolveUnion(schema, o);
+ int index = getResolvedUnionIndex(schema);
+ if (index > 1) { //if more than 2 type in union, serialize directly for now
+ byte[] serializedBytes = toBytes(o, schema);
+ put.add(hcol.getFamily(), qualifier, serializedBytes);
+ } else {
+ Schema resolvedSchema = schema.getTypes().get(index);
+ addPutsAndDeletes(put, delete, o, resolvedSchema.getType(),
+ resolvedSchema, hcol, qualifier);
+ }
+ }
+ break;
+ case MAP:
+ // if it's a map that has been modified, then the content should be replaced by the new one
+ // This is because we don't know if the content has changed or not.
+ if (qualifier == null) {
+ delete.deleteFamily(hcol.getFamily());
+ } else {
+ delete.deleteColumn(hcol.getFamily(), qualifier);
+ }
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ Set<Entry> set = ((Map) o).entrySet();
+ for (@SuppressWarnings("rawtypes") Entry entry : set) {
+ byte[] qual = toBytes(entry.getKey());
+ addPutsAndDeletes(put, delete, entry.getValue(), schema.getValueType()
+ .getType(), schema.getValueType(), hcol, qual);
+ }
+ break;
+ case ARRAY:
+ List<?> array = (List<?>) o;
+ int j = 0;
+ for (Object item : array) {
+ addPutsAndDeletes(put, delete, item, schema.getElementType().getType(),
+ schema.getElementType(), hcol, Bytes.toBytes(j++));
+ }
+ break;
+ default:
+ byte[] serializedBytes = toBytes(o, schema);
+ put.add(hcol.getFamily(), qualifier, serializedBytes);
+ break;
+ }
+ }
+
+ private boolean isNullable(Schema unionSchema) {
+ for (Schema innerSchema : unionSchema.getTypes()) {
+ if (innerSchema.getType().equals(Schema.Type.NULL)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public void delete(T obj) {
throw new RuntimeException("Not implemented yet");
}
@@ -373,8 +359,7 @@ implements Configurable {
String[] fields = getFieldsToQuery(query.getFields());
//find whether all fields are queried, which means that complete
//rows will be deleted
- boolean isAllFields = Arrays.equals(fields
- , getBeanFactory().getCachedPersistent().getFields());
+ boolean isAllFields = Arrays.equals(fields, getFields());
org.apache.gora.query.Result<K, T> result = null;
result = query.execute();
@@ -519,18 +504,28 @@ implements Configurable {
"Wrong gora-hbase-mapping.xml?");
}
Schema fieldSchema = fieldMap.get(f).schema();
- switch (fieldSchema.getType()) {
- case MAP:
- case ARRAY:
- get.addFamily(col.family); break;
- default:
- get.addColumn(col.family, col.qualifier); break;
- }
+ addFamilyOrColumn(get, col, fieldSchema);
}
}
- private void addFields(Scan scan, Query<K,T> query)
- throws IOException {
+ private void addFamilyOrColumn(Get get, HBaseColumn col, Schema fieldSchema) {
+ switch (fieldSchema.getType()) {
+ case UNION:
+ int index = getResolvedUnionIndex(fieldSchema);
+ Schema resolvedSchema = fieldSchema.getTypes().get(index);
+ addFamilyOrColumn(get, col, resolvedSchema);
+ break;
+ case MAP:
+ case ARRAY:
+ get.addFamily(col.family);
+ break;
+ default:
+ get.addColumn(col.family, col.qualifier);
+ break;
+ }
+ }
+
+ private void addFields(Scan scan, Query<K, T> query) throws IOException {
String[] fields = query.getFields();
for (String f : fields) {
HBaseColumn col = mapping.getColumn(f);
@@ -539,19 +534,30 @@ implements Configurable {
"Wrong gora-hbase-mapping.xml?");
}
Schema fieldSchema = fieldMap.get(f).schema();
- switch (fieldSchema.getType()) {
- case MAP:
- case ARRAY:
- scan.addFamily(col.family); break;
- default:
- scan.addColumn(col.family, col.qualifier); break;
- }
+ addFamilyOrColumn(scan, col, fieldSchema);
}
}
- //TODO: HBase Get, Scan, Delete should extend some common interface with addFamily, etc
- private void addFields(Delete delete, Query<K,T> query)
- throws IOException {
+ private void addFamilyOrColumn(Scan scan, HBaseColumn col, Schema fieldSchema) {
+ switch (fieldSchema.getType()) {
+ case UNION:
+ int index = getResolvedUnionIndex(fieldSchema);
+ Schema resolvedSchema = fieldSchema.getTypes().get(index);
+ addFamilyOrColumn(scan, col, resolvedSchema);
+ break;
+ case MAP:
+ case ARRAY:
+ scan.addFamily(col.family);
+ break;
+ default:
+ scan.addColumn(col.family, col.qualifier);
+ break;
+ }
+ }
+
+ // TODO: HBase Get, Scan, Delete should extend some common interface with
+ // addFamily, etc
+ private void addFields(Delete delete, Query<K, T> query) throws IOException {
String[] fields = query.getFields();
for (String f : fields) {
HBaseColumn col = mapping.getColumn(f);
@@ -560,13 +566,25 @@ implements Configurable {
"Wrong gora-hbase-mapping.xml?");
}
Schema fieldSchema = fieldMap.get(f).schema();
- switch (fieldSchema.getType()) {
- case MAP:
- case ARRAY:
- delete.deleteFamily(col.family); break;
- default:
- delete.deleteColumn(col.family, col.qualifier); break;
- }
+ addFamilyOrColumn(delete, col, fieldSchema);
+ }
+ }
+
+ private void addFamilyOrColumn(Delete delete, HBaseColumn col,
+ Schema fieldSchema) {
+ switch (fieldSchema.getType()) {
+ case UNION:
+ int index = getResolvedUnionIndex(fieldSchema);
+ Schema resolvedSchema = fieldSchema.getTypes().get(index);
+ addFamilyOrColumn(delete, col, resolvedSchema);
+ break;
+ case MAP:
+ case ARRAY:
+ delete.deleteFamily(col.family);
+ break;
+ default:
+ delete.deleteColumn(col.family, col.qualifier);
+ break;
}
}
@@ -582,7 +600,6 @@ implements Configurable {
}
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
/**
* Creates a new Persistent instance with the values in 'result' for the fields listed.
* @param result result form a HTable#get()
@@ -597,7 +614,6 @@ implements Configurable {
return null;
T persistent = newPersistent();
- StateManager stateManager = persistent.getStateManager();
for (String f : fields) {
HBaseColumn col = mapping.getColumn(f);
if (col == null) {
@@ -606,50 +622,90 @@ implements Configurable {
}
Field field = fieldMap.get(f);
Schema fieldSchema = field.schema();
- switch(fieldSchema.getType()) {
- case MAP:
- NavigableMap<byte[], byte[]> qualMap =
- result.getNoVersionMap().get(col.getFamily());
- if (qualMap == null) {
- continue;
- }
- Schema valueSchema = fieldSchema.getValueType();
- Map map = new HashMap();
- for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
- map.put(new Utf8(Bytes.toString(e.getKey())),
- fromBytes(valueSchema, e.getValue()));
- }
- setField(persistent, field, map);
- break;
- case ARRAY:
- qualMap = result.getFamilyMap(col.getFamily());
- if (qualMap == null) {
- continue;
- }
- valueSchema = fieldSchema.getElementType();
- ArrayList arrayList = new ArrayList();
- for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
- arrayList.add(fromBytes(valueSchema, e.getValue()));
- }
- ListGenericArray arr = new ListGenericArray(fieldSchema, arrayList);
- setField(persistent, field, arr);
- break;
- default:
- byte[] val = result.getValue(col.getFamily(), col.getQualifier());
- if (val == null) {
- continue;
- }
- setField(persistent, field, val);
- break;
- }
+ setField(result,persistent, col, field, fieldSchema);
}
- stateManager.clearDirty(persistent);
+ persistent.clearDirty();
return persistent;
}
+ private void setField(Result result, T persistent, HBaseColumn col,
+ Field field, Schema fieldSchema) throws IOException {
+ switch (fieldSchema.getType()) {
+ case UNION:
+ int index = getResolvedUnionIndex(fieldSchema);
+ if (index > 1) { //if more than 2 type in union, deserialize directly for now
+ byte[] val = result.getValue(col.getFamily(), col.getQualifier());
+ if (val == null) {
+ return;
+ }
+ setField(persistent, field, val);
+ } else {
+ Schema resolvedSchema = fieldSchema.getTypes().get(index);
+ setField(result, persistent, col, field, resolvedSchema);
+ }
+ break;
+ case MAP:
+ NavigableMap<byte[], byte[]> qualMap = result.getNoVersionMap().get(
+ col.getFamily());
+ if (qualMap == null) {
+ return;
+ }
+ Schema valueSchema = fieldSchema.getValueType();
+ Map<Utf8, Object> map = new HashMap<Utf8, Object>();
+ for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
+ map.put(new Utf8(Bytes.toString(e.getKey())),
+ fromBytes(valueSchema, e.getValue()));
+ }
+ setField(persistent, field, map);
+ break;
+ case ARRAY:
+ qualMap = result.getFamilyMap(col.getFamily());
+ if (qualMap == null) {
+ return;
+ }
+ valueSchema = fieldSchema.getElementType();
+ ArrayList<Object> arrayList = new ArrayList<Object>();
+ DirtyListWrapper<Object> dirtyListWrapper = new DirtyListWrapper<Object>(arrayList);
+ for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
+ dirtyListWrapper.add(fromBytes(valueSchema, e.getValue()));
+ }
+ setField(persistent, field, arrayList);
+ break;
+ default:
+ byte[] val = result.getValue(col.getFamily(), col.getQualifier());
+ if (val == null) {
+ return;
+ }
+ setField(persistent, field, val);
+ break;
+ }
+ }
+
+ //TODO temporary solution, has to be changed after implementation of saving the index of union type
+ private int getResolvedUnionIndex(Schema unionScema) {
+ if (unionScema.getTypes().size() == 2) {
+
+ // schema [type0, type1]
+ Type type0 = unionScema.getTypes().get(0).getType();
+ Type type1 = unionScema.getTypes().get(1).getType();
+
+ // Check if types are different and there's a "null", like ["null","type"]
+ // or ["type","null"]
+ if (!type0.equals(type1)
+ && (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL))) {
+
+ if (type0.equals(Schema.Type.NULL))
+ return 1;
+ else
+ return 0;
+ }
+ }
+ return 2;
+ }
+
@SuppressWarnings({ "unchecked", "rawtypes" })
private void setField(T persistent, Field field, Map map) {
- persistent.put(field.pos(), new StatefulHashMap(map));
+ persistent.put(field.pos(), new DirtyMapWrapper(map));
}
private void setField(T persistent, Field field, byte[] val)
@@ -657,9 +713,9 @@ implements Configurable {
persistent.put(field.pos(), fromBytes(field.schema(), val));
}
- @SuppressWarnings("rawtypes")
- private void setField(T persistent, Field field, GenericArray list) {
- persistent.put(field.pos(), list);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private void setField(T persistent, Field field, List list) {
+ persistent.put(field.pos(), new DirtyListWrapper(list));
}
@SuppressWarnings("unchecked")
@@ -725,7 +781,6 @@ implements Configurable {
mappingBuilder.addField(fieldName, family, qualifier);
mappingBuilder.addColumnFamily(tableName, family);
}
-
//we found a matching key and value class definition,
//do not continue on other class definitions
break;
@@ -790,4 +845,4 @@ implements Configurable {
this.scannerCaching = numRows ;
return this ;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
index 3a99cad..5f549b3 100644
--- a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
+++ b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
@@ -19,12 +19,14 @@ package org.apache.gora.hbase.store;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
@@ -35,7 +37,11 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowLock;
+import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -145,18 +151,6 @@ public class HBaseTableConnection implements HTableInterface {
}
@Override
- public void batch(List<Row> actions, Object[] results) throws IOException,
- InterruptedException {
- getTable().batch(actions, results);
- }
-
- @Override
- public Object[] batch(List<Row> actions) throws IOException,
- InterruptedException {
- return getTable().batch(actions);
- }
-
- @Override
public Result get(Get get) throws IOException {
return getTable().get(get);
}
@@ -254,4 +248,78 @@ public class HBaseTableConnection implements HTableInterface {
public void unlockRow(RowLock rl) throws IOException {
getTable().unlockRow(rl);
}
+
+ @Override
+ public void batch(List<? extends Row> actions, Object[] results)
+ throws IOException, InterruptedException {
+ // TODO Auto-generated method stub
+ getTable().batch(actions, results);
+
+ }
+
+ @Override
+ public Object[] batch(List<? extends Row> actions) throws IOException,
+ InterruptedException {
+ // TODO Auto-generated method stub
+ return getTable().batch(actions);
+ }
+
+ @Override
+ public void mutateRow(RowMutations rm) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public Result append(Append append) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol,
+ byte[] row) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public <T extends CoprocessorProtocol, R> Map<byte[], R> coprocessorExec(
+ Class<T> protocol, byte[] startKey, byte[] endKey, Call<T, R> callable)
+ throws IOException, Throwable {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public <T extends CoprocessorProtocol, R> void coprocessorExec(
+ Class<T> protocol, byte[] startKey, byte[] endKey, Call<T, R> callable,
+ Callback<R> callback) throws IOException, Throwable {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setAutoFlush(boolean autoFlush) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public long getWriteBufferSize() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public void setWriteBufferSize(long writeBufferSize) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
}
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java b/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
index e4acd77..03dc339 100644
--- a/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
+++ b/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
@@ -20,10 +20,8 @@ package org.apache.gora.hbase.util;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
@@ -31,12 +29,13 @@ import org.apache.avro.generic.GenericData;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8;
+
import org.apache.gora.util.AvroUtils;
-import org.apache.gora.avro.PersistentDatumReader;
-import org.apache.gora.avro.PersistentDatumWriter;
+
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -47,27 +46,13 @@ public class HBaseByteInterface {
/**
* Threadlocals maintaining reusable binary decoders and encoders.
*/
+ private static ThreadLocal<ByteArrayOutputStream> outputStream =
+ new ThreadLocal<ByteArrayOutputStream>();
+
public static final ThreadLocal<BinaryDecoder> decoders =
new ThreadLocal<BinaryDecoder>();
- public static final ThreadLocal<BinaryEncoderWithStream> encoders =
- new ThreadLocal<BinaryEncoderWithStream>();
-
- /**
- * A BinaryEncoder that exposes the outputstream so that it can be reset
- * every time. (This is a workaround to reuse BinaryEncoder and the buffers,
- * normally provided be EncoderFactory, but this class does not exist yet
- * in the current Avro version).
- */
- public static final class BinaryEncoderWithStream extends BinaryEncoder {
- public BinaryEncoderWithStream(OutputStream out) {
- super(out);
- }
-
- protected OutputStream getOut() {
- return out;
- }
- }
-
+ public static final ThreadLocal<BinaryEncoder> encoders =
+ new ThreadLocal<BinaryEncoder>();
/*
* Create a threadlocal map for the datum readers and writers, because
* they are not thread safe, at least not before Avro 1.4.0 (See AVRO-650).
@@ -75,23 +60,15 @@ public class HBaseByteInterface {
* writer pair for every schema, instead of one for every thread.
*/
- public static final ThreadLocal<Map<String, SpecificDatumReader<?>>>
- readerMaps = new ThreadLocal<Map<String, SpecificDatumReader<?>>>() {
- protected Map<String,SpecificDatumReader<?>> initialValue() {
- return new HashMap<String, SpecificDatumReader<?>>();
- };
- };
-
- public static final ThreadLocal<Map<String, SpecificDatumWriter<?>>>
- writerMaps = new ThreadLocal<Map<String, SpecificDatumWriter<?>>>() {
- protected Map<String,SpecificDatumWriter<?>> initialValue() {
- return new HashMap<String, SpecificDatumWriter<?>>();
- };
- };
+ public static final ConcurrentHashMap<String, SpecificDatumReader<?>> readerMap =
+ new ConcurrentHashMap<String, SpecificDatumReader<?>>();
+
+ public static final ConcurrentHashMap<String, SpecificDatumWriter<?>> writerMap =
+ new ConcurrentHashMap<String, SpecificDatumWriter<?>>();
/**
- * Deserializes an array of bytes matching the given schema to the proper basic (enum, Utf8,...) or
- * complex type (Persistent/Record).
+ * Deserializes an array of bytes matching the given schema to the proper basic
+ * (enum, Utf8,...) or complex type (Persistent/Record).
*
* Does not handle <code>arrays/maps</code> if not inside a <code>record</code> type.
*
@@ -100,7 +77,7 @@ public class HBaseByteInterface {
* @return Enum|Utf8|ByteBuffer|Integer|Long|Float|Double|Boolean|Persistent|Null
* @throws IOException
*/
- @SuppressWarnings("rawtypes")
+ @SuppressWarnings({ "rawtypes" })
public static Object fromBytes(Schema schema, byte[] val) throws IOException {
Type type = schema.getType();
switch (type) {
@@ -144,37 +121,28 @@ public class HBaseByteInterface {
// => deserialize like "case RECORD"
case RECORD:
- Map<String, SpecificDatumReader<?>> readerMap = readerMaps.get();
- PersistentDatumReader<?> reader = null ;
-
- // For UNION schemas, must use a specific PersistentDatumReader
+ // For UNION schemas, must use a specific SpecificDatumReader
// from the readerMap since unions don't have own name
// (key name in map will be "UNION-type-type-...")
- if (schema.getType().equals(Schema.Type.UNION)) {
- reader = (PersistentDatumReader<?>)readerMap.get(String.valueOf(schema.hashCode()));
- if (reader == null) {
- reader = new PersistentDatumReader(schema, false);// ignore dirty bits
- readerMap.put(String.valueOf(schema.hashCode()), reader);
- }
- } else {
- // ELSE use reader for Record
- reader = (PersistentDatumReader<?>)readerMap.get(schema.getFullName());
- if (reader == null) {
- reader = new PersistentDatumReader(schema, false);// ignore dirty bits
- readerMap.put(schema.getFullName(), reader);
+ String schemaId = schema.getType().equals(Schema.Type.UNION) ? String.valueOf(schema.hashCode()) : schema.getFullName();
+
+ SpecificDatumReader<?> reader = (SpecificDatumReader<?>)readerMap.get(schemaId);
+ if (reader == null) {
+ reader = new SpecificDatumReader(schema);// ignore dirty bits
+ SpecificDatumReader localReader=null;
+ if((localReader=readerMap.putIfAbsent(schemaId, reader))!=null) {
+ reader = localReader;
}
}
// initialize a decoder, possibly reusing previous one
BinaryDecoder decoderFromCache = decoders.get();
- BinaryDecoder decoder=DecoderFactory.defaultFactory().
- createBinaryDecoder(val, decoderFromCache);
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(val, null);
// put in threadlocal cache if the initial get was empty
if (decoderFromCache==null) {
decoders.set(decoder);
}
-
- return reader.read((Object)null, schema, decoder);
+ return reader.read(null, decoder);
default: throw new RuntimeException("Unknown type: "+type);
}
}
@@ -255,7 +223,7 @@ public class HBaseByteInterface {
public static byte[] toBytes(Object o, Schema schema) throws IOException {
Type type = schema.getType();
switch (type) {
- case STRING: return Bytes.toBytes(((Utf8)o).toString()); // TODO: maybe ((Utf8)o).getBytes(); ?
+ case STRING: return Bytes.toBytes(((CharSequence)o).toString()); // TODO: maybe ((Utf8)o).getBytes(); ?
case BYTES: return ((ByteBuffer)o).array();
case INT: return Bytes.toBytes((Integer)o);
case LONG: return Bytes.toBytes((Long)o);
@@ -264,65 +232,26 @@ public class HBaseByteInterface {
case BOOLEAN: return (Boolean)o ? new byte[] {1} : new byte[] {0};
case ENUM: return new byte[] { (byte)((Enum<?>) o).ordinal() };
case UNION:
- // XXX Special case: When writing the top-level field of a record we must handle the
- // special case ["null","type"] definitions: this will be written as if it was ["type"]
- // if not in a special case, will execute "case RECORD".
-
- if (schema.getTypes().size() == 2) {
-
- // schema [type0, type1]
- Type type0 = schema.getTypes().get(0).getType() ;
- Type type1 = schema.getTypes().get(1).getType() ;
-
- // Check if types are different and there's a "null", like ["null","type"] or ["type","null"]
- if (!type0.equals(type1)
- && ( type0.equals(Schema.Type.NULL)
- || type1.equals(Schema.Type.NULL))) {
-
- if (o == null) return null ;
-
- int index = GenericData.get().resolveUnion(schema, o);
- schema = schema.getTypes().get(index) ;
-
- return toBytes(o, schema) ; // Serialize as if schema was ["type"]
- }
-
- }
- // else
- // type = [type0,type1] where type0=type1
- // => Serialize like "case RECORD" with Avro
-
case RECORD:
- Map<String, SpecificDatumWriter<?>> writerMap = writerMaps.get();
- PersistentDatumWriter writer = null ;
- // For UNION schemas, must use a specific PersistentDatumReader
- // from the readerMap since unions don't have own name
- // (key name in map will be "UNION-type-type-...")
- if (schema.getType().equals(Schema.Type.UNION)) {
- writer = (PersistentDatumWriter<?>) writerMap.get(String.valueOf(schema.hashCode()));
- if (writer == null) {
- writer = new PersistentDatumWriter(schema,false);// ignore dirty bits
- writerMap.put(String.valueOf(schema.hashCode()),writer);
- }
- } else {
- // ELSE use writer for Record
- writer = (PersistentDatumWriter<?>) writerMap.get(schema.getFullName());
- if (writer == null) {
- writer = new PersistentDatumWriter(schema,false);// ignore dirty bits
- writerMap.put(schema.getFullName(),writer);
- }
+ SpecificDatumWriter writer = (SpecificDatumWriter<?>) writerMap.get(schema.getFullName());
+ if (writer == null) {
+ writer = new SpecificDatumWriter(schema);// ignore dirty bits
+ writerMap.put(schema.getFullName(),writer);
}
-
- BinaryEncoderWithStream encoder = encoders.get();
- if (encoder == null) {
- encoder = new BinaryEncoderWithStream(new ByteArrayOutputStream());
+
+ BinaryEncoder encoderFromCache = encoders.get();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ outputStream.set(bos);
+ BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(bos, null);
+ if (encoderFromCache == null) {
encoders.set(encoder);
}
+
//reset the buffers
- ByteArrayOutputStream os = (ByteArrayOutputStream) encoder.getOut();
+ ByteArrayOutputStream os = outputStream.get();
os.reset();
-
- writer.write(schema,o, encoder);
+
+ writer.write(o, encoder);
encoder.flush();
return os.toByteArray();
default: throw new RuntimeException("Unknown type: "+type);
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/test/conf/gora-hbase-mapping.xml
----------------------------------------------------------------------
diff --git a/gora-hbase/src/test/conf/gora-hbase-mapping.xml b/gora-hbase/src/test/conf/gora-hbase-mapping.xml
index afb57f0..44f6be2 100644
--- a/gora-hbase/src/test/conf/gora-hbase-mapping.xml
+++ b/gora-hbase/src/test/conf/gora-hbase-mapping.xml
@@ -44,6 +44,7 @@
<field name="content" family="content"/>
<field name="parsedContent" family="parsedContent"/>
<field name="outlinks" family="outlinks"/>
+ <field name="headers" family="headers"/>
<field name="metadata" family="common" qualifier="metadata"/>
</class>
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/test/conf/hbase-site.xml
----------------------------------------------------------------------
diff --git a/gora-hbase/src/test/conf/hbase-site.xml b/gora-hbase/src/test/conf/hbase-site.xml
index 5024e85..51e1346 100644
--- a/gora-hbase/src/test/conf/hbase-site.xml
+++ b/gora-hbase/src/test/conf/hbase-site.xml
@@ -119,7 +119,6 @@
<description>
Maximum desired file size for an HRegion. If filesize exceeds
value + (value / 2), the HRegion is split in two. Default: 256M.
-
Keep the maximum filesize small so we split more often in tests.
</description>
</property>
@@ -129,9 +128,14 @@
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
- <value>21818</value>
+ <value>2181</value>
<description>Property from ZooKeeper's config zoo.cfg.
The port at which the clients will connect.
</description>
</property>
+ <property>
+ <name>hbase.zookeeper.quorum</name>
+ <value>localhost</value>
+ <description>The directory shared by region servers.</description>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java b/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java
index ea9317e..ca9b559 100644
--- a/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java
+++ b/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java
@@ -22,19 +22,22 @@ import org.apache.gora.GoraTestDriver;
import org.apache.gora.hbase.store.HBaseStore;
import org.apache.gora.hbase.util.HBaseClusterSingleton;
import org.apache.hadoop.conf.Configuration;
-
-//HBase imports
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
/**
* Helper class for third part tests using gora-hbase backend.
* @see GoraTestDriver
*/
public class GoraHBaseTestDriver extends GoraTestDriver {
+
+ /**
+ * Cluster object used for testing.
+ */
private static final HBaseClusterSingleton cluster = HBaseClusterSingleton.build(1);
+ /**
+ * Default Constructor.
+ */
public GoraHBaseTestDriver() {
super(HBaseStore.class);
}
@@ -42,6 +45,7 @@ public class GoraHBaseTestDriver extends GoraTestDriver {
@Override
public void setUpClass() throws Exception {
super.setUpClass();
+ conf = getConf();
log.info("Setting up HBase Test Driver");
}
@@ -50,28 +54,40 @@ public class GoraHBaseTestDriver extends GoraTestDriver {
super.tearDownClass();
log.info("Teardown HBase test driver");
}
-
+
@Override
public void setUp() throws Exception {
cluster.truncateAllTables();
// super.setUp() deletes all tables, but must only truncate in the right way -HBaseClusterSingleton-
//super.setUp();
}
-
+
@Override
public void tearDown() throws Exception {
// Do nothing. setUp() must ensure the right data.
}
+
+ /**
+ * Deletes all tables from the MiniCluster
+ * @throws Exception in case some table is not able to be deleted.
+ */
public void deleteAllTables() throws Exception {
cluster.deleteAllTables();
}
-
+
+ /**
+ * Gets the configuration from the MiniCluster.
+ * @return Configuration from MiniCluster.
+ */
public Configuration getConf() {
return cluster.getHbaseTestingUtil().getConfiguration();
}
-
+
+ /**
+ * Gets HBaseTestingUtility from the MiniCluster object.
+ * @return HBaseTestingUtility object
+ */
public HBaseTestingUtility getHbaseUtil() {
return cluster.getHbaseTestingUtil();
}
-
-}
+}
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java b/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java
index 0c92f16..7cca479 100644
--- a/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java
+++ b/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java
@@ -18,7 +18,6 @@
package org.apache.gora.hbase.mapreduce;
-import org.apache.gora.examples.generated.TokenDatum;
import org.apache.gora.examples.generated.WebPage;
import org.apache.gora.hbase.store.HBaseStore;
import org.apache.gora.hbase.util.HBaseClusterSingleton;
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java b/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
index c521680..8dd319f 100644
--- a/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
+++ b/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
@@ -56,7 +56,7 @@ public class TestHBaseStore extends DataStoreTestBase {
@Override
public void setUp() throws Exception {
super.setUp();
- conf = getTestDriver().getHbaseUtil().getConfiguration();
+ conf = getTestDriver().getConf();
}
@SuppressWarnings("unchecked")
@@ -87,7 +87,7 @@ public class TestHBaseStore extends DataStoreTestBase {
@Override
public void assertPutArray() throws IOException {
- HTable table = new HTable("WebPage");
+ HTable table = new HTable(conf,"WebPage");
Get get = new Get(Bytes.toBytes("com.example/http"));
org.apache.hadoop.hbase.client.Result result = table.get(get);
@@ -109,7 +109,7 @@ public class TestHBaseStore extends DataStoreTestBase {
public void assertPutBytes(byte[] contentBytes) throws IOException {
// Check first the parameter "contentBytes" if written+read right.
- HTable table = new HTable("WebPage");
+ HTable table = new HTable(conf,"WebPage");
Get get = new Get(Bytes.toBytes("com.example/http"));
org.apache.hadoop.hbase.client.Result result = table.get(get);
@@ -130,14 +130,15 @@ public class TestHBaseStore extends DataStoreTestBase {
page = webPageStore.get("com.example/http") ;
assertNull(page.getContent()) ;
// Check directly with HBase
- table = new HTable("WebPage");
+ table = new HTable(conf,"WebPage");
get = new Get(Bytes.toBytes("com.example/http"));
result = table.get(get);
actualBytes = result.getValue(Bytes.toBytes("content"), null);
assertNull(actualBytes);
table.close();
- // Test writing+reading an empty bytes field. FIELD in HBASE MUST become EMPTY (byte[0])
+ // Test writing+reading an empty bytes field. FIELD in HBASE MUST
+ // become EMPTY (byte[0])
page = webPageStore.get("com.example/http") ;
page.setContent(ByteBuffer.wrap("".getBytes())) ;
webPageStore.put("com.example/http", page) ;
@@ -146,7 +147,7 @@ public class TestHBaseStore extends DataStoreTestBase {
page = webPageStore.get("com.example/http") ;
assertTrue(Arrays.equals("".getBytes(),page.getContent().array())) ;
// Check directly with HBase
- table = new HTable("WebPage");
+ table = new HTable(conf,"WebPage");
get = new Get(Bytes.toBytes("com.example/http"));
result = table.get(get);
actualBytes = result.getValue(Bytes.toBytes("content"), null);
@@ -156,15 +157,16 @@ public class TestHBaseStore extends DataStoreTestBase {
}
/**
- * Checks that when writing a top level union <code>['null','type']</code> the value is written in raw format
+ * Checks that when writing a top level union <code>['null','type']</code>
+ * the value is written in raw format
* @throws Exception
*/
@Test
public void assertTopLevelUnions() throws Exception {
WebPage page = webPageStore.newPersistent();
-
+
// Write webpage data
- page.setUrl(new Utf8("http://example.com"));
+ page.setUrl((CharSequence) new Utf8("http://example.com"));
byte[] contentBytes = "example content in example.com".getBytes();
ByteBuffer buff = ByteBuffer.wrap(contentBytes);
page.setContent(buff);
@@ -172,7 +174,7 @@ public class TestHBaseStore extends DataStoreTestBase {
webPageStore.flush() ;
// Read directly from HBase
- HTable table = new HTable("WebPage");
+ HTable table = new HTable(conf,"WebPage");
Get get = new Get(Bytes.toBytes("com.example/http"));
org.apache.hadoop.hbase.client.Result result = table.get(get);
@@ -180,11 +182,14 @@ public class TestHBaseStore extends DataStoreTestBase {
assertNotNull(bytesRead) ;
assertTrue(Arrays.equals(bytesRead, contentBytes));
+ table.close();
}
/**
- * Checks that when writing a top level union <code>['null','type']</code> with the option <code>RAW_ROOT_FIELDS_OPTION=true</code>
- * the column is not created, and when <code>RAW_ROOT_FIELDS_OPTION=false</code> the <code>null</code> value is serialized
+ * Checks that when writing a top level union <code>['null','type']</code>
+ * with the option <code>RAW_ROOT_FIELDS_OPTION=true</code>
+ * the column is not created, and when <code>RAW_ROOT_FIELDS_OPTION=false</code>
+ * the <code>null</code> value is serialized
* with Avro.
* @throws Exception
*/
@@ -193,17 +198,17 @@ public class TestHBaseStore extends DataStoreTestBase {
WebPage page = webPageStore.newPersistent();
// Write webpage data
- page.setUrl(new Utf8("http://example.com"));
+ page.setUrl((CharSequence) new Utf8("http://example.com"));
page.setContent(null); // This won't change internal field status to dirty, so
page.setDirty("content") ; // need to change it manually
webPageStore.put("com.example/http", page);
webPageStore.flush() ;
// Read directly from HBase
- HTable table = new HTable("WebPage");
+ HTable table = new HTable(conf,"WebPage");
Get get = new Get(Bytes.toBytes("com.example/http"));
org.apache.hadoop.hbase.client.Result result = table.get(get);
-
+ table.close();
byte[] contentBytes = result.getValue(Bytes.toBytes("content"), null);
assertNull(webPageStore.get("com.example/http", new String[]{"content"})) ;
@@ -212,7 +217,7 @@ public class TestHBaseStore extends DataStoreTestBase {
@Override
public void assertPutMap() throws IOException {
- HTable table = new HTable("WebPage");
+ HTable table = new HTable(conf,"WebPage");
Get get = new Get(Bytes.toBytes("com.example/http"));
org.apache.hadoop.hbase.client.Result result = table.get(get);
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/test/java/org/apache/gora/hbase/util/HBaseClusterSingleton.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/test/java/org/apache/gora/hbase/util/HBaseClusterSingleton.java b/gora-hbase/src/test/java/org/apache/gora/hbase/util/HBaseClusterSingleton.java
index fe1ae0e..d9888be 100644
--- a/gora-hbase/src/test/java/org/apache/gora/hbase/util/HBaseClusterSingleton.java
+++ b/gora-hbase/src/test/java/org/apache/gora/hbase/util/HBaseClusterSingleton.java
@@ -85,6 +85,9 @@ public final class HBaseClusterSingleton {
htu.getConfiguration().setBoolean("dfs.support.append", true);
htu.getConfiguration().setInt("zookeeper.session.timeout", 20000);
+ //htu.getConfiguration().set("hbase.zookeeper.quorum", "localhost");
+ //htu.getConfiguration().setInt("hbase.zookeeper.property.clientPort", 2181);
+
try {
LOG.info("Start HBase mini cluster.");
hbaseCluster = htu.startMiniCluster(numServers);
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java b/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java
index 138015a..1fcbc69 100644
--- a/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java
+++ b/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java
@@ -20,6 +20,7 @@ package org.apache.gora.hbase.util;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
@@ -28,53 +29,57 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.avro.util.Utf8;
+
import org.apache.gora.examples.generated.Employee;
import org.apache.gora.examples.generated.Metadata;
+
import static org.junit.Assert.assertEquals;
+
import org.junit.Test;
public class TestHBaseByteInterface {
- private static final Random RANDOM = new Random();
+ private static final Random RANDOM = new Random(0);
@Test
public void testEncodingDecoding() throws Exception {
for (int i=0; i < 1000; i++) {
//employer
- Utf8 name = new Utf8("john");
+ CharSequence name = (CharSequence) new Utf8("john");
long dateOfBirth = System.currentTimeMillis();
int salary = 1337;
- Utf8 ssn = new Utf8(String.valueOf(RANDOM.nextLong()));
+ CharSequence ssn = (CharSequence) new Utf8(String.valueOf(RANDOM.nextLong()));
- Employee e = new Employee();
+ Employee e = Employee.newBuilder().build();
e.setName(name);
e.setDateOfBirth(dateOfBirth);
e.setSalary(salary);
e.setSsn(ssn);
- byte[] employerBytes = HBaseByteInterface.toBytes(e, Employee._SCHEMA);
- Employee e2 = (Employee) HBaseByteInterface.fromBytes(Employee._SCHEMA,
+ byte[] employerBytes = HBaseByteInterface.toBytes(e, Employee.SCHEMA$);
+ Employee e2 = (Employee) HBaseByteInterface.fromBytes(Employee.SCHEMA$,
employerBytes);
assertEquals(name, e2.getName());
- assertEquals(dateOfBirth, e2.getDateOfBirth());
- assertEquals(salary, e2.getSalary());
+ assertEquals(dateOfBirth, e2.getDateOfBirth().longValue());
+ assertEquals(salary, e2.getSalary().intValue());
assertEquals(ssn, e2.getSsn());
//metadata
- Utf8 key = new Utf8("theKey");
- Utf8 value = new Utf8("theValue " + RANDOM.nextLong());
-
- Metadata m = new Metadata();
- m.putToData(key, value);
+ CharSequence key = (CharSequence) new Utf8("theKey");
+ CharSequence value = (CharSequence) new Utf8("theValue " + RANDOM.nextLong());
+ HashMap<CharSequence, CharSequence> data = new HashMap<CharSequence, CharSequence>();
+ data.put(key, value);
+ Metadata m = Metadata.newBuilder().build();
+ m.setData(data);
- byte[] datumBytes = HBaseByteInterface.toBytes(m, Metadata._SCHEMA);
- Metadata m2 = (Metadata) HBaseByteInterface.fromBytes(Metadata._SCHEMA,
+ byte[] datumBytes = HBaseByteInterface.toBytes(m, Metadata.SCHEMA$);
+ Metadata m2 = (Metadata) HBaseByteInterface.fromBytes(Metadata.SCHEMA$,
datumBytes);
- assertEquals(value, m2.getFromData(key));
+ assertEquals(value, m2.getData().get(key));
}
}
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-solr/pom.xml
----------------------------------------------------------------------
diff --git a/gora-solr/pom.xml b/gora-solr/pom.xml
index 3ad5307..56eaab9 100644
--- a/gora-solr/pom.xml
+++ b/gora-solr/pom.xml
@@ -142,6 +142,12 @@
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.restlet.jee</groupId>
+ <artifactId>org.restlet.ext.servlet</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
@@ -188,17 +194,30 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
- <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty.orbit</groupId>
+ <artifactId>javax.servlet</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
- <scope>compile</scope>
+ <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
- <scope>compile</scope>
+ <scope>runtime</scope>
+ </dependency>
+
+ <!-- ADDED TO AVOID PROBLEMS WITH JAVAX -->
+ <dependency>
+ <groupId>javax</groupId>
+ <artifactId>javaee-api</artifactId>
+ <version>7.0</version>
</dependency>
<!-- Testing Dependencies -->
@@ -210,6 +229,12 @@
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-test-framework</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ </exclusion>
+ </exclusions>
<scope>test</scope>
</dependency>
<dependency>