You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2014/06/04 18:36:23 UTC

[06/50] [abbrv] GORA-321. Merge GORA_94 into Gora trunk

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/gora-hbase/pom.xml b/gora-hbase/pom.xml
index d48cb32..2b3fd2b 100644
--- a/gora-hbase/pom.xml
+++ b/gora-hbase/pom.xml
@@ -100,6 +100,7 @@
         <dependency>
             <groupId>org.apache.gora</groupId>
             <artifactId>gora-core</artifactId>
+            <scope>compile</scope>
         </dependency>
 
         <dependency>
@@ -108,45 +109,49 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <!-- END of Gora Internal Dependencies -->
 
-        <!-- Hadoop Dependencies -->
         <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase</artifactId>
+            <scope>compile</scope>
         </dependency>
 
         <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase</artifactId>
-            <type>test-jar</type>
-        </dependency>
-
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
+            <groupId>org.apache.avro</groupId>
             <artifactId>avro</artifactId>
+            <scope>compile</scope>
         </dependency>
 
         <!-- Misc Dependencies -->
         <dependency>
             <groupId>org.jdom</groupId>
             <artifactId>jdom</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <scope>compile</scope>
         </dependency>
 
         <!-- Logging Dependencies -->
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
+            <scope>compile</scope>
         </dependency>
 
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-simple</artifactId>
+            <scope>compile</scope>
         </dependency>
 
         <dependency>
             <groupId>log4j</groupId>
             <artifactId>log4j</artifactId>
+            <scope>runtime</scope>
 	        <exclusions>
 	          <exclusion>
                 <groupId>javax.jms</groupId>
@@ -154,18 +159,28 @@
 	          </exclusion>
             </exclusions>
         </dependency>
+        <!-- END of Logging Dependencies -->
 
         <!-- Testing Dependencies -->
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
+            <scope>test</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-test</artifactId>
+            <scope>test</scope>
         </dependency>
-
+        
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <!-- END of Testing Dependencies -->
     </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java b/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java
index 5c876ad..ddd13aa 100644
--- a/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java
+++ b/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java
@@ -21,7 +21,6 @@ package org.apache.gora.hbase.query;
 import java.io.IOException;
 
 import org.apache.gora.hbase.store.HBaseStore;
-import org.apache.gora.persistency.Persistent;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.hadoop.hbase.client.Get;

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java b/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java
index 0189954..3c06267 100644
--- a/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java
+++ b/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java
@@ -23,7 +23,6 @@ import static org.apache.gora.hbase.util.HBaseByteInterface.fromBytes;
 import java.io.IOException;
 
 import org.apache.gora.hbase.store.HBaseStore;
-import org.apache.gora.persistency.Persistent;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.impl.ResultBase;

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
index d07ff05..92f0591 100644
--- a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
+++ b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
@@ -25,7 +25,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -36,21 +35,15 @@ import java.util.Set;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericArray;
 import org.apache.avro.util.Utf8;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.gora.hbase.query.HBaseGetResult;
 import org.apache.gora.hbase.query.HBaseQuery;
 import org.apache.gora.hbase.query.HBaseScannerResult;
 import org.apache.gora.hbase.store.HBaseMapping.HBaseMappingBuilder;
 import org.apache.gora.hbase.util.HBaseByteInterface;
 import org.apache.gora.hbase.util.HBaseFilterUtil;
-import org.apache.gora.persistency.ListGenericArray;
-import org.apache.gora.persistency.State;
-import org.apache.gora.persistency.StateManager;
-import org.apache.gora.persistency.StatefulHashMap;
-import org.apache.gora.persistency.StatefulMap;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
+import org.apache.gora.persistency.impl.DirtyMapWrapper;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
@@ -74,6 +67,8 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.jdom.Document;
 import org.jdom.Element;
 import org.jdom.input.SAXBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * DataStore for HBase. Thread safe.
@@ -135,7 +130,7 @@ implements Configurable {
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
-    
+
     // Set scanner caching option
     try {
       this.setScannerCaching(
@@ -146,7 +141,7 @@ implements Configurable {
       LOG.error("Can not load " + SCANNER_CACHING_PROPERTIES_KEY + " from gora.properties. Setting to default value: " + SCANNER_CACHING_PROPERTIES_DEFAULT, e) ;
       this.setScannerCaching(SCANNER_CACHING_PROPERTIES_DEFAULT) ; // Default value if something is wrong
     }
-    
+
     if(autoCreateSchema) {
       createSchema();
     }
@@ -225,126 +220,117 @@ implements Configurable {
   }
 
   /**
-   * {@inheritDoc}
-   * Serializes the Persistent data and saves in HBase.
-   * Topmost fields of the record are persisted in "raw" format (not avro serialized). This behavior happens
-   * in maps and arrays too.
+   * {@inheritDoc} Serializes the Persistent data and saves in HBase. Topmost
+   * fields of the record are persisted in "raw" format (not avro serialized).
+   * This behavior happens in maps and arrays too.
    * 
-   * ["null","type"] type (a.k.a. optional field) is persisted like as if it is ["type"], but the column get
-   * deleted if value==null (so value read after will be null).
+   * ["null","type"] type (a.k.a. optional field) is persisted like as if it is
+   * ["type"], but the column get deleted if value==null (so value read after
+   * will be null).
    * 
-   * @param persistent Record to be persisted in HBase
+   * @param persistent
+   *          Record to be persisted in HBase
    */
-  @SuppressWarnings({ "unchecked", "rawtypes" })
   @Override
   public void put(K key, T persistent) {
-    try{
+    try {
       Schema schema = persistent.getSchema();
-      StateManager stateManager = persistent.getStateManager();
       byte[] keyRaw = toBytes(key);
       Put put = new Put(keyRaw);
       Delete delete = new Delete(keyRaw);
-      boolean hasPuts = false;
-      boolean hasDeletes = false;
-      Iterator<Field> iter = schema.getFields().iterator();
-      for (int i = 0; iter.hasNext(); i++) {
-        Field field = iter.next();
-        if (!stateManager.isDirty(persistent, i)) {
+      List<Field> fields = schema.getFields();
+      for (int i = 1; i < fields.size(); i++) {
+        if (!persistent.isDirty(i)) {
           continue;
         }
-        Type type = field.schema().getType();
+        Field field = fields.get(i);
         Object o = persistent.get(i);
         HBaseColumn hcol = mapping.getColumn(field.name());
         if (hcol == null) {
-          throw new RuntimeException("HBase mapping for field ["+ persistent.getClass().getName() +
-              "#"+ field.name()+"] not found. Wrong gora-hbase-mapping.xml?");
-        }
-        switch(type) {
-          case MAP:
-            if(o instanceof StatefulMap) {
-              StatefulHashMap<Utf8, ?> map = (StatefulHashMap<Utf8, ?>) o;
-              for (Entry<Utf8, State> e : map.states().entrySet()) {
-                Utf8 mapKey = e.getKey();
-                switch (e.getValue()) {
-                  case DIRTY:
-                    byte[] qual = Bytes.toBytes(mapKey.toString());
-                    byte[] val = toBytes(map.get(mapKey), field.schema().getValueType());
-                    // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
-                    if (val == null) { // value == null => must delete the column
-                      delete.deleteColumn(hcol.getFamily(), qual);
-                      hasDeletes = true;
-                    } else {
-                      put.add(hcol.getFamily(), qual, val);
-                      hasPuts = true;
-                    }
-                    break;
-                  case DELETED:
-                    qual = Bytes.toBytes(mapKey.toString());
-                    hasDeletes = true;
-                    delete.deleteColumn(hcol.getFamily(), qual);
-                    break;
-                  default :
-                    break;
-                }
-              }
-            } else {
-              Set<Map.Entry> set = ((Map)o).entrySet();
-              for(Entry entry: set) {
-                byte[] qual = toBytes(entry.getKey());
-                byte[] val = toBytes(entry.getValue(), field.schema().getValueType());
-                // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
-                if (val == null) { // value == null => must delete the column
-                  delete.deleteColumn(hcol.getFamily(), qual);
-                  hasDeletes = true;
-                } else {
-                  put.add(hcol.getFamily(), qual, val);
-                  hasPuts = true;
-                }
-              }
-            }
-            break;
-          case ARRAY:
-            if(o instanceof GenericArray) {
-              GenericArray arr = (GenericArray) o;
-              int j=0;
-              for(Object item : arr) {
-                byte[] val = toBytes(item, field.schema().getElementType());
-                // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
-                if (val == null) { // value == null => must delete the column
-                  delete.deleteColumn(hcol.getFamily(), Bytes.toBytes(j++));
-                  hasDeletes = true;
-                } else {
-                  put.add(hcol.getFamily(), Bytes.toBytes(j++), val);
-                  hasPuts = true;
-                }
-              }
-            }
-            break;
-          default:
-            // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
-            byte[] serializedBytes = toBytes(o, field.schema()) ;
-            if (serializedBytes == null) { // value == null => must delete the column
-              delete.deleteColumn(hcol.getFamily(), hcol.getQualifier());
-              hasDeletes = true;
-            } else {
-              put.add(hcol.getFamily(), hcol.getQualifier(), serializedBytes);
-              hasPuts = true;
-            }
-            break;
+          throw new RuntimeException("HBase mapping for field ["
+              + persistent.getClass().getName() + "#" + field.name()
+              + "] not found. Wrong gora-hbase-mapping.xml?");
         }
+        addPutsAndDeletes(put, delete, o, field.schema().getType(),
+            field.schema(), hcol, hcol.getQualifier());
       }
-      if (hasPuts) {
+      if (put.size() > 0) {
         table.put(put);
       }
-      if (hasDeletes) {
+      if (delete.size() > 0) {
         table.delete(delete);
+        table.delete(delete);
+        table.delete(delete); // HBase sometimes does not delete arbitrarily
       }
-    } catch(IOException ex2){
+    } catch (IOException ex2) {
       LOG.error(ex2.getMessage());
       LOG.error(ex2.getStackTrace().toString());
     }
   }
 
+  private void addPutsAndDeletes(Put put, Delete delete, Object o, Type type,
+      Schema schema, HBaseColumn hcol, byte[] qualifier) throws IOException {
+    switch (type) {
+    case UNION:
+      if (isNullable(schema) && o == null) {
+        if (qualifier == null) {
+          delete.deleteFamily(hcol.getFamily());
+        } else {
+          delete.deleteColumn(hcol.getFamily(), qualifier);
+        }
+      } else {
+//        int index = GenericData.get().resolveUnion(schema, o);
+        int index = getResolvedUnionIndex(schema);
+        if (index > 1) {  //if more than 2 type in union, serialize directly for now
+          byte[] serializedBytes = toBytes(o, schema);
+          put.add(hcol.getFamily(), qualifier, serializedBytes);
+        } else {
+          Schema resolvedSchema = schema.getTypes().get(index);
+          addPutsAndDeletes(put, delete, o, resolvedSchema.getType(),
+              resolvedSchema, hcol, qualifier);
+        }
+      }
+      break;
+    case MAP:
+      // if it's a map that has been modified, then the content should be replaced by the new one
+      // This is because we don't know if the content has changed or not.
+      if (qualifier == null) {
+        delete.deleteFamily(hcol.getFamily());
+      } else {
+        delete.deleteColumn(hcol.getFamily(), qualifier);
+      }
+      @SuppressWarnings({ "rawtypes", "unchecked" })
+      Set<Entry> set = ((Map) o).entrySet();
+      for (@SuppressWarnings("rawtypes") Entry entry : set) {
+        byte[] qual = toBytes(entry.getKey());
+        addPutsAndDeletes(put, delete, entry.getValue(), schema.getValueType()
+            .getType(), schema.getValueType(), hcol, qual);
+      }
+      break;
+    case ARRAY:
+      List<?> array = (List<?>) o;
+      int j = 0;
+      for (Object item : array) {
+        addPutsAndDeletes(put, delete, item, schema.getElementType().getType(),
+            schema.getElementType(), hcol, Bytes.toBytes(j++));
+      }
+      break;
+    default:
+      byte[] serializedBytes = toBytes(o, schema);
+      put.add(hcol.getFamily(), qualifier, serializedBytes);
+      break;
+    }
+  }
+
+  private boolean isNullable(Schema unionSchema) {
+    for (Schema innerSchema : unionSchema.getTypes()) {
+      if (innerSchema.getType().equals(Schema.Type.NULL)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   public void delete(T obj) {
     throw new RuntimeException("Not implemented yet");
   }
@@ -373,8 +359,7 @@ implements Configurable {
       String[] fields = getFieldsToQuery(query.getFields());
       //find whether all fields are queried, which means that complete
       //rows will be deleted
-      boolean isAllFields = Arrays.equals(fields
-          , getBeanFactory().getCachedPersistent().getFields());
+      boolean isAllFields = Arrays.equals(fields, getFields());
   
       org.apache.gora.query.Result<K, T> result = null;
       result = query.execute();
@@ -519,18 +504,28 @@ implements Configurable {
             "Wrong gora-hbase-mapping.xml?");
       }
       Schema fieldSchema = fieldMap.get(f).schema();
-      switch (fieldSchema.getType()) {
-        case MAP:
-        case ARRAY:
-          get.addFamily(col.family); break;
-        default:
-          get.addColumn(col.family, col.qualifier); break;
-      }
+      addFamilyOrColumn(get, col, fieldSchema);
     }
   }
 
-  private void addFields(Scan scan, Query<K,T> query)
-  throws IOException {
+  private void addFamilyOrColumn(Get get, HBaseColumn col, Schema fieldSchema) {
+    switch (fieldSchema.getType()) {
+    case UNION:
+      int index = getResolvedUnionIndex(fieldSchema);
+      Schema resolvedSchema = fieldSchema.getTypes().get(index);
+      addFamilyOrColumn(get, col, resolvedSchema);
+      break;
+    case MAP:
+    case ARRAY:
+      get.addFamily(col.family);
+      break;
+    default:
+      get.addColumn(col.family, col.qualifier);
+      break;
+    }
+  }
+
+  private void addFields(Scan scan, Query<K, T> query) throws IOException {
     String[] fields = query.getFields();
     for (String f : fields) {
       HBaseColumn col = mapping.getColumn(f);
@@ -539,19 +534,30 @@ implements Configurable {
             "Wrong gora-hbase-mapping.xml?");
       }
       Schema fieldSchema = fieldMap.get(f).schema();
-      switch (fieldSchema.getType()) {
-        case MAP:
-        case ARRAY:
-          scan.addFamily(col.family); break;
-        default:
-          scan.addColumn(col.family, col.qualifier); break;
-      }
+      addFamilyOrColumn(scan, col, fieldSchema);
     }
   }
 
-  //TODO: HBase Get, Scan, Delete should extend some common interface with addFamily, etc
-  private void addFields(Delete delete, Query<K,T> query)
-    throws IOException {
+  private void addFamilyOrColumn(Scan scan, HBaseColumn col, Schema fieldSchema) {
+    switch (fieldSchema.getType()) {
+    case UNION:
+      int index = getResolvedUnionIndex(fieldSchema);
+      Schema resolvedSchema = fieldSchema.getTypes().get(index);
+      addFamilyOrColumn(scan, col, resolvedSchema);
+      break;
+    case MAP:
+    case ARRAY:
+      scan.addFamily(col.family);
+      break;
+    default:
+      scan.addColumn(col.family, col.qualifier);
+      break;
+    }
+  }
+
+  // TODO: HBase Get, Scan, Delete should extend some common interface with
+  // addFamily, etc
+  private void addFields(Delete delete, Query<K, T> query)    throws IOException {
     String[] fields = query.getFields();
     for (String f : fields) {
       HBaseColumn col = mapping.getColumn(f);
@@ -560,13 +566,25 @@ implements Configurable {
             "Wrong gora-hbase-mapping.xml?");
       }
       Schema fieldSchema = fieldMap.get(f).schema();
-      switch (fieldSchema.getType()) {
-        case MAP:
-        case ARRAY:
-          delete.deleteFamily(col.family); break;
-        default:
-          delete.deleteColumn(col.family, col.qualifier); break;
-      }
+      addFamilyOrColumn(delete, col, fieldSchema);
+    }
+  }
+
+  private void addFamilyOrColumn(Delete delete, HBaseColumn col,
+      Schema fieldSchema) {
+    switch (fieldSchema.getType()) {
+    case UNION:
+      int index = getResolvedUnionIndex(fieldSchema);
+      Schema resolvedSchema = fieldSchema.getTypes().get(index);
+      addFamilyOrColumn(delete, col, resolvedSchema);
+      break;
+    case MAP:
+    case ARRAY:
+      delete.deleteFamily(col.family);
+      break;
+    default:
+      delete.deleteColumn(col.family, col.qualifier);
+      break;
     }
   }
 
@@ -582,7 +600,6 @@ implements Configurable {
     }
   }
 
-  @SuppressWarnings({ "unchecked", "rawtypes" })
   /**
    * Creates a new Persistent instance with the values in 'result' for the fields listed.
    * @param result result form a HTable#get()
@@ -597,7 +614,6 @@ implements Configurable {
       return null;
 
     T persistent = newPersistent();
-    StateManager stateManager = persistent.getStateManager();
     for (String f : fields) {
       HBaseColumn col = mapping.getColumn(f);
       if (col == null) {
@@ -606,50 +622,90 @@ implements Configurable {
       }
       Field field = fieldMap.get(f);
       Schema fieldSchema = field.schema();
-      switch(fieldSchema.getType()) {
-        case MAP:
-          NavigableMap<byte[], byte[]> qualMap =
-            result.getNoVersionMap().get(col.getFamily());
-          if (qualMap == null) {
-            continue;
-          }
-          Schema valueSchema = fieldSchema.getValueType();
-          Map map = new HashMap();
-          for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
-            map.put(new Utf8(Bytes.toString(e.getKey())),
-                fromBytes(valueSchema, e.getValue()));
-          }
-          setField(persistent, field, map);
-          break;
-        case ARRAY:
-          qualMap = result.getFamilyMap(col.getFamily());
-          if (qualMap == null) {
-            continue;
-          }
-          valueSchema = fieldSchema.getElementType();
-          ArrayList arrayList = new ArrayList();
-          for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
-            arrayList.add(fromBytes(valueSchema, e.getValue()));
-          }
-          ListGenericArray arr = new ListGenericArray(fieldSchema, arrayList);
-          setField(persistent, field, arr);
-          break;
-        default:
-          byte[] val = result.getValue(col.getFamily(), col.getQualifier());
-          if (val == null) {
-            continue;
-          }
-          setField(persistent, field, val);
-          break;
-      }
+      setField(result,persistent, col, field, fieldSchema);
     }
-    stateManager.clearDirty(persistent);
+    persistent.clearDirty();
     return persistent;
   }
 
+  private void setField(Result result, T persistent, HBaseColumn col,
+      Field field, Schema fieldSchema) throws IOException {
+    switch (fieldSchema.getType()) {
+    case UNION:
+      int index = getResolvedUnionIndex(fieldSchema);
+      if (index > 1) { //if more than 2 type in union, deserialize directly for now
+        byte[] val = result.getValue(col.getFamily(), col.getQualifier());
+        if (val == null) {
+          return;
+        }
+        setField(persistent, field, val);
+      } else {
+        Schema resolvedSchema = fieldSchema.getTypes().get(index);
+        setField(result, persistent, col, field, resolvedSchema);
+      }
+      break;
+    case MAP:
+      NavigableMap<byte[], byte[]> qualMap = result.getNoVersionMap().get(
+          col.getFamily());
+      if (qualMap == null) {
+        return;
+      }
+      Schema valueSchema = fieldSchema.getValueType();
+      Map<Utf8, Object> map = new HashMap<Utf8, Object>();
+      for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
+        map.put(new Utf8(Bytes.toString(e.getKey())),
+            fromBytes(valueSchema, e.getValue()));
+      }
+      setField(persistent, field, map);
+      break;
+    case ARRAY:
+      qualMap = result.getFamilyMap(col.getFamily());
+      if (qualMap == null) {
+        return;
+      }
+      valueSchema = fieldSchema.getElementType();
+      ArrayList<Object> arrayList = new ArrayList<Object>();
+      DirtyListWrapper<Object> dirtyListWrapper = new DirtyListWrapper<Object>(arrayList);
+      for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
+        dirtyListWrapper.add(fromBytes(valueSchema, e.getValue()));
+      }
+      setField(persistent, field, arrayList);
+      break;
+    default:
+      byte[] val = result.getValue(col.getFamily(), col.getQualifier());
+      if (val == null) {
+        return;
+      }
+      setField(persistent, field, val);
+      break;
+    }
+  }
+
+  //TODO temporary solution, has to be changed after implementation of saving the index of union type
+  private int getResolvedUnionIndex(Schema unionScema) {
+    if (unionScema.getTypes().size() == 2) {
+
+      // schema [type0, type1]
+      Type type0 = unionScema.getTypes().get(0).getType();
+      Type type1 = unionScema.getTypes().get(1).getType();
+
+      // Check if types are different and there's a "null", like ["null","type"]
+      // or ["type","null"]
+      if (!type0.equals(type1)
+          && (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL))) {
+
+        if (type0.equals(Schema.Type.NULL))
+          return 1;
+        else
+          return 0;
+      }
+    }
+    return 2;
+  }
+
   @SuppressWarnings({ "unchecked", "rawtypes" })
   private void setField(T persistent, Field field, Map map) {
-    persistent.put(field.pos(), new StatefulHashMap(map));
+    persistent.put(field.pos(), new DirtyMapWrapper(map));
   }
 
   private void setField(T persistent, Field field, byte[] val)
@@ -657,9 +713,9 @@ implements Configurable {
     persistent.put(field.pos(), fromBytes(field.schema(), val));
   }
 
-  @SuppressWarnings("rawtypes")
-  private void setField(T persistent, Field field, GenericArray list) {
-    persistent.put(field.pos(), list);
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  private void setField(T persistent, Field field, List list) {
+    persistent.put(field.pos(), new DirtyListWrapper(list));
   }
 
   @SuppressWarnings("unchecked")
@@ -725,7 +781,6 @@ implements Configurable {
             mappingBuilder.addField(fieldName, family, qualifier);
             mappingBuilder.addColumnFamily(tableName, family);
           }
-          
           //we found a matching key and value class definition,
           //do not continue on other class definitions
           break;
@@ -790,4 +845,4 @@ implements Configurable {
     this.scannerCaching = numRows ;
     return this ;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
index 3a99cad..5f549b3 100644
--- a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
+++ b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
@@ -19,12 +19,14 @@ package org.apache.gora.hbase.store;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
@@ -35,7 +37,11 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.RowLock;
+import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 
@@ -145,18 +151,6 @@ public class HBaseTableConnection implements HTableInterface {
   }
 
   @Override
-  public void batch(List<Row> actions, Object[] results) throws IOException,
-      InterruptedException {
-    getTable().batch(actions, results);
-  }
-
-  @Override
-  public Object[] batch(List<Row> actions) throws IOException,
-      InterruptedException {
-    return getTable().batch(actions);
-  }
-
-  @Override
   public Result get(Get get) throws IOException {
     return getTable().get(get);
   }
@@ -254,4 +248,78 @@ public class HBaseTableConnection implements HTableInterface {
   public void unlockRow(RowLock rl) throws IOException {
     getTable().unlockRow(rl);
   }
+
+  @Override
+  public void batch(List<? extends Row> actions, Object[] results)
+      throws IOException, InterruptedException {
+    // TODO Auto-generated method stub
+    getTable().batch(actions, results);
+    
+  }
+
+  @Override
+  public Object[] batch(List<? extends Row> actions) throws IOException,
+      InterruptedException {
+    // TODO Auto-generated method stub
+    return getTable().batch(actions);
+  }
+
+  @Override
+  public void mutateRow(RowMutations rm) throws IOException {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public Result append(Append append) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol,
+      byte[] row) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public <T extends CoprocessorProtocol, R> Map<byte[], R> coprocessorExec(
+      Class<T> protocol, byte[] startKey, byte[] endKey, Call<T, R> callable)
+      throws IOException, Throwable {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public <T extends CoprocessorProtocol, R> void coprocessorExec(
+      Class<T> protocol, byte[] startKey, byte[] endKey, Call<T, R> callable,
+      Callback<R> callback) throws IOException, Throwable {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public void setAutoFlush(boolean autoFlush) {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public long getWriteBufferSize() {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public void setWriteBufferSize(long writeBufferSize) throws IOException {
+    // TODO Auto-generated method stub
+    
+  }
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java b/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
index e4acd77..03dc339 100644
--- a/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
+++ b/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
@@ -20,10 +20,8 @@ package org.apache.gora.hbase.util;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
@@ -31,12 +29,13 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
+
 import org.apache.gora.util.AvroUtils;
-import org.apache.gora.avro.PersistentDatumReader;
-import org.apache.gora.avro.PersistentDatumWriter;
+
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -47,27 +46,13 @@ public class HBaseByteInterface {
   /**
    * Threadlocals maintaining reusable binary decoders and encoders.
    */
+  private static ThreadLocal<ByteArrayOutputStream> outputStream =
+      new ThreadLocal<ByteArrayOutputStream>();
+  
   public static final ThreadLocal<BinaryDecoder> decoders =
       new ThreadLocal<BinaryDecoder>();
-  public static final ThreadLocal<BinaryEncoderWithStream> encoders =
-      new ThreadLocal<BinaryEncoderWithStream>();
-  
-  /**
-   * A BinaryEncoder that exposes the outputstream so that it can be reset
-   * every time. (This is a workaround to reuse BinaryEncoder and the buffers,
-   * normally provided be EncoderFactory, but this class does not exist yet 
-   * in the current Avro version).
-   */
-  public static final class BinaryEncoderWithStream extends BinaryEncoder {
-    public BinaryEncoderWithStream(OutputStream out) {
-      super(out);
-    }
-    
-    protected OutputStream getOut() {
-      return out;
-    }
-  }
-  
+  public static final ThreadLocal<BinaryEncoder> encoders =
+      new ThreadLocal<BinaryEncoder>();
   /*
    * Create a threadlocal map for the datum readers and writers, because
    * they are not thread safe, at least not before Avro 1.4.0 (See AVRO-650).
@@ -75,23 +60,15 @@ public class HBaseByteInterface {
    * writer pair for every schema, instead of one for every thread.
    */
   
-  public static final ThreadLocal<Map<String, SpecificDatumReader<?>>> 
-    readerMaps = new ThreadLocal<Map<String, SpecificDatumReader<?>>>() {
-      protected Map<String,SpecificDatumReader<?>> initialValue() {
-        return new HashMap<String, SpecificDatumReader<?>>();
-      };
-  };
-  
-  public static final ThreadLocal<Map<String, SpecificDatumWriter<?>>> 
-    writerMaps = new ThreadLocal<Map<String, SpecificDatumWriter<?>>>() {
-      protected Map<String,SpecificDatumWriter<?>> initialValue() {
-        return new HashMap<String, SpecificDatumWriter<?>>();
-      };
-  };
+  public static final ConcurrentHashMap<String, SpecificDatumReader<?>> readerMap = 
+      new ConcurrentHashMap<String, SpecificDatumReader<?>>();
+     
+  public static final ConcurrentHashMap<String, SpecificDatumWriter<?>> writerMap = 
+      new ConcurrentHashMap<String, SpecificDatumWriter<?>>();
 
   /**
-   * Deserializes an array of bytes matching the given schema to the proper basic (enum, Utf8,...) or
-   * complex type (Persistent/Record).
+   * Deserializes an array of bytes matching the given schema to the proper basic 
+   * (enum, Utf8,...) or complex type (Persistent/Record).
    * 
    * Does not handle <code>arrays/maps</code> if not inside a <code>record</code> type.
    * 
@@ -100,7 +77,7 @@ public class HBaseByteInterface {
    * @return Enum|Utf8|ByteBuffer|Integer|Long|Float|Double|Boolean|Persistent|Null
    * @throws IOException
    */
-  @SuppressWarnings("rawtypes")
+  @SuppressWarnings({ "rawtypes" })
   public static Object fromBytes(Schema schema, byte[] val) throws IOException {
     Type type = schema.getType();
     switch (type) {
@@ -144,37 +121,28 @@ public class HBaseByteInterface {
       // => deserialize like "case RECORD"
 
     case RECORD:
-      Map<String, SpecificDatumReader<?>> readerMap = readerMaps.get();
-      PersistentDatumReader<?> reader = null ;
-            
-      // For UNION schemas, must use a specific PersistentDatumReader
+      // For UNION schemas, must use a specific SpecificDatumReader
       // from the readerMap since unions don't have own name
       // (key name in map will be "UNION-type-type-...")
-      if (schema.getType().equals(Schema.Type.UNION)) {
-        reader = (PersistentDatumReader<?>)readerMap.get(String.valueOf(schema.hashCode()));
-        if (reader == null) {
-          reader = new PersistentDatumReader(schema, false);// ignore dirty bits
-          readerMap.put(String.valueOf(schema.hashCode()), reader);
-        }
-      } else {
-        // ELSE use reader for Record
-        reader = (PersistentDatumReader<?>)readerMap.get(schema.getFullName());
-        if (reader == null) {
-          reader = new PersistentDatumReader(schema, false);// ignore dirty bits
-          readerMap.put(schema.getFullName(), reader);
+      String schemaId = schema.getType().equals(Schema.Type.UNION) ? String.valueOf(schema.hashCode()) : schema.getFullName();      
+      
+      SpecificDatumReader<?> reader = (SpecificDatumReader<?>)readerMap.get(schemaId);
+      if (reader == null) {
+        reader = new SpecificDatumReader(schema);// ignore dirty bits
+        SpecificDatumReader localReader=null;
+        if((localReader=readerMap.putIfAbsent(schemaId, reader))!=null) {
+          reader = localReader;
         }
       }
       
       // initialize a decoder, possibly reusing previous one
       BinaryDecoder decoderFromCache = decoders.get();
-      BinaryDecoder decoder=DecoderFactory.defaultFactory().
-          createBinaryDecoder(val, decoderFromCache);
+      BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(val, null);
       // put in threadlocal cache if the initial get was empty
       if (decoderFromCache==null) {
         decoders.set(decoder);
       }
-      
-      return reader.read((Object)null, schema, decoder);
+      return reader.read(null, decoder);
     default: throw new RuntimeException("Unknown type: "+type);
     }
   }
@@ -255,7 +223,7 @@ public class HBaseByteInterface {
   public static byte[] toBytes(Object o, Schema schema) throws IOException {
     Type type = schema.getType();
     switch (type) {
-    case STRING:  return Bytes.toBytes(((Utf8)o).toString()); // TODO: maybe ((Utf8)o).getBytes(); ?
+    case STRING:  return Bytes.toBytes(((CharSequence)o).toString()); // TODO: maybe ((Utf8)o).getBytes(); ?
     case BYTES:   return ((ByteBuffer)o).array();
     case INT:     return Bytes.toBytes((Integer)o);
     case LONG:    return Bytes.toBytes((Long)o);
@@ -264,65 +232,26 @@ public class HBaseByteInterface {
     case BOOLEAN: return (Boolean)o ? new byte[] {1} : new byte[] {0};
     case ENUM:    return new byte[] { (byte)((Enum<?>) o).ordinal() };
     case UNION:
-      // XXX Special case: When writing the top-level field of a record we must handle the
-      // special case ["null","type"] definitions: this will be written as if it was ["type"]
-      // if not in a special case, will execute "case RECORD".
-      
-      if (schema.getTypes().size() == 2) {
-        
-        // schema [type0, type1]
-        Type type0 = schema.getTypes().get(0).getType() ;
-        Type type1 = schema.getTypes().get(1).getType() ;
-        
-        // Check if types are different and there's a "null", like ["null","type"] or ["type","null"]
-        if (!type0.equals(type1)
-            && (   type0.equals(Schema.Type.NULL)
-                || type1.equals(Schema.Type.NULL))) {
-
-          if (o == null) return null ;
-          
-          int index = GenericData.get().resolveUnion(schema, o);
-          schema = schema.getTypes().get(index) ;
-          
-          return toBytes(o, schema) ; // Serialize as if schema was ["type"] 
-        }
-        
-      }
-      // else
-      //   type = [type0,type1] where type0=type1
-      // => Serialize like "case RECORD" with Avro
-      
     case RECORD:
-      Map<String, SpecificDatumWriter<?>> writerMap = writerMaps.get();
-      PersistentDatumWriter writer = null ;
-      // For UNION schemas, must use a specific PersistentDatumReader
-      // from the readerMap since unions don't have own name
-      // (key name in map will be "UNION-type-type-...")
-      if (schema.getType().equals(Schema.Type.UNION)) {
-        writer = (PersistentDatumWriter<?>) writerMap.get(String.valueOf(schema.hashCode()));
-        if (writer == null) {
-          writer = new PersistentDatumWriter(schema,false);// ignore dirty bits
-          writerMap.put(String.valueOf(schema.hashCode()),writer);
-        }
-      } else {
-        // ELSE use writer for Record
-        writer = (PersistentDatumWriter<?>) writerMap.get(schema.getFullName());
-        if (writer == null) {
-          writer = new PersistentDatumWriter(schema,false);// ignore dirty bits
-          writerMap.put(schema.getFullName(),writer);
-        }
+      SpecificDatumWriter writer = (SpecificDatumWriter<?>) writerMap.get(schema.getFullName());
+      if (writer == null) {
+        writer = new SpecificDatumWriter(schema);// ignore dirty bits
+        writerMap.put(schema.getFullName(),writer);
       }
-      
-      BinaryEncoderWithStream encoder = encoders.get();
-      if (encoder == null) {
-        encoder = new BinaryEncoderWithStream(new ByteArrayOutputStream());
+
+      BinaryEncoder encoderFromCache = encoders.get();
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      outputStream.set(bos);
+      BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(bos, null);
+      if (encoderFromCache == null) {
         encoders.set(encoder);
       }
+
       //reset the buffers
-      ByteArrayOutputStream os = (ByteArrayOutputStream) encoder.getOut();
+      ByteArrayOutputStream os = outputStream.get();
       os.reset();
-      
-      writer.write(schema,o, encoder);
+
+      writer.write(o, encoder);
       encoder.flush();
       return os.toByteArray();
     default: throw new RuntimeException("Unknown type: "+type);

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/test/conf/gora-hbase-mapping.xml
----------------------------------------------------------------------
diff --git a/gora-hbase/src/test/conf/gora-hbase-mapping.xml b/gora-hbase/src/test/conf/gora-hbase-mapping.xml
index afb57f0..44f6be2 100644
--- a/gora-hbase/src/test/conf/gora-hbase-mapping.xml
+++ b/gora-hbase/src/test/conf/gora-hbase-mapping.xml
@@ -44,6 +44,7 @@
     <field name="content" family="content"/>
     <field name="parsedContent" family="parsedContent"/>
     <field name="outlinks" family="outlinks"/>
+    <field name="headers" family="headers"/>
     <field name="metadata" family="common" qualifier="metadata"/>
   </class>
 

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/test/conf/hbase-site.xml
----------------------------------------------------------------------
diff --git a/gora-hbase/src/test/conf/hbase-site.xml b/gora-hbase/src/test/conf/hbase-site.xml
index 5024e85..51e1346 100644
--- a/gora-hbase/src/test/conf/hbase-site.xml
+++ b/gora-hbase/src/test/conf/hbase-site.xml
@@ -119,7 +119,6 @@
     <description>
     Maximum desired file size for an HRegion.  If filesize exceeds
     value + (value / 2), the HRegion is split in two.  Default: 256M.
-
     Keep the maximum filesize small so we split more often in tests.
     </description>
   </property>
@@ -129,9 +128,14 @@
   </property>
   <property>
     <name>hbase.zookeeper.property.clientPort</name>
-    <value>21818</value>
+    <value>2181</value>
     <description>Property from ZooKeeper's config zoo.cfg.
     The port at which the clients will connect.
     </description>
   </property>
+  <property>
+    <name>hbase.zookeeper.quorum</name>
+    <value>localhost</value>
+    <description>The directory shared by region servers.</description>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java b/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java
index ea9317e..ca9b559 100644
--- a/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java
+++ b/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java
@@ -22,19 +22,22 @@ import org.apache.gora.GoraTestDriver;
 import org.apache.gora.hbase.store.HBaseStore;
 import org.apache.gora.hbase.util.HBaseClusterSingleton;
 import org.apache.hadoop.conf.Configuration;
-
-//HBase imports
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 
 /**
  * Helper class for third part tests using gora-hbase backend. 
  * @see GoraTestDriver
  */
 public class GoraHBaseTestDriver extends GoraTestDriver {
+
+  /**
+   * Cluster object used for testing.
+   */
   private static final HBaseClusterSingleton cluster = HBaseClusterSingleton.build(1);
 
+  /**
+   * Default Constructor.
+   */
   public GoraHBaseTestDriver() {
     super(HBaseStore.class);
   }
@@ -42,6 +45,7 @@ public class GoraHBaseTestDriver extends GoraTestDriver {
   @Override
   public void setUpClass() throws Exception {
     super.setUpClass();
+    conf = getConf();
     log.info("Setting up HBase Test Driver");
   }
 
@@ -50,28 +54,40 @@ public class GoraHBaseTestDriver extends GoraTestDriver {
     super.tearDownClass();
     log.info("Teardown HBase test driver");
   }
-  
+
   @Override
   public void setUp() throws Exception {
     cluster.truncateAllTables();
     // super.setUp() deletes all tables, but must only truncate in the right way -HBaseClusterSingleton-
     //super.setUp();
   }
-  
+
   @Override
   public void tearDown() throws Exception {
     // Do nothing. setUp() must ensure the right data.
   }
+
+  /**
+   * Deletes all tables from the MiniCluster
+   * @throws Exception in case some table is not able to be deleted.
+   */
   public void deleteAllTables() throws Exception {
     cluster.deleteAllTables();
   }
-  
+
+  /**
+   * Gets the configuration from the MiniCluster.
+   * @return Configuration from MiniCluster.
+   */
   public Configuration getConf() {
     return cluster.getHbaseTestingUtil().getConfiguration();
   }
-  
+
+  /**
+   * Gets HBaseTestingUtility from the MiniCluster object.
+   * @return HBaseTestingUtility object
+   */
   public HBaseTestingUtility getHbaseUtil() {
     return cluster.getHbaseTestingUtil();
   }
-  
-}		
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java b/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java
index 0c92f16..7cca479 100644
--- a/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java
+++ b/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java
@@ -18,7 +18,6 @@
 
 package org.apache.gora.hbase.mapreduce;
 
-import org.apache.gora.examples.generated.TokenDatum;
 import org.apache.gora.examples.generated.WebPage;
 import org.apache.gora.hbase.store.HBaseStore;
 import org.apache.gora.hbase.util.HBaseClusterSingleton;

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java b/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
index c521680..8dd319f 100644
--- a/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
+++ b/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
@@ -56,7 +56,7 @@ public class TestHBaseStore extends DataStoreTestBase {
   @Override
   public void setUp() throws Exception {
     super.setUp();
-    conf = getTestDriver().getHbaseUtil().getConfiguration();
+    conf = getTestDriver().getConf();
   }
     
   @SuppressWarnings("unchecked")
@@ -87,7 +87,7 @@ public class TestHBaseStore extends DataStoreTestBase {
 
   @Override
   public void assertPutArray() throws IOException { 
-    HTable table = new HTable("WebPage");
+    HTable table = new HTable(conf,"WebPage");
     Get get = new Get(Bytes.toBytes("com.example/http"));
     org.apache.hadoop.hbase.client.Result result = table.get(get);
     
@@ -109,7 +109,7 @@ public class TestHBaseStore extends DataStoreTestBase {
   public void assertPutBytes(byte[] contentBytes) throws IOException {    
 
     // Check first the parameter "contentBytes" if written+read right.
-    HTable table = new HTable("WebPage");
+    HTable table = new HTable(conf,"WebPage");
     Get get = new Get(Bytes.toBytes("com.example/http"));
     org.apache.hadoop.hbase.client.Result result = table.get(get);
     
@@ -130,14 +130,15 @@ public class TestHBaseStore extends DataStoreTestBase {
     page = webPageStore.get("com.example/http") ;
     assertNull(page.getContent()) ;
     // Check directly with HBase
-    table = new HTable("WebPage");
+    table = new HTable(conf,"WebPage");
     get = new Get(Bytes.toBytes("com.example/http"));
     result = table.get(get);
     actualBytes = result.getValue(Bytes.toBytes("content"), null);
     assertNull(actualBytes);
     table.close();
     
-    // Test writing+reading an empty bytes field. FIELD in HBASE MUST become EMPTY (byte[0])
+    // Test writing+reading an empty bytes field. FIELD in HBASE MUST 
+    // become EMPTY (byte[0])
     page = webPageStore.get("com.example/http") ;
     page.setContent(ByteBuffer.wrap("".getBytes())) ;
     webPageStore.put("com.example/http", page) ;
@@ -146,7 +147,7 @@ public class TestHBaseStore extends DataStoreTestBase {
     page = webPageStore.get("com.example/http") ;
     assertTrue(Arrays.equals("".getBytes(),page.getContent().array())) ;
     // Check directly with HBase
-    table = new HTable("WebPage");
+    table = new HTable(conf,"WebPage");
     get = new Get(Bytes.toBytes("com.example/http"));
     result = table.get(get);
     actualBytes = result.getValue(Bytes.toBytes("content"), null);
@@ -156,15 +157,16 @@ public class TestHBaseStore extends DataStoreTestBase {
   }
   
   /**
-   * Checks that when writing a top level union <code>['null','type']</code> the value is written in raw format
+   * Checks that when writing a top level union <code>['null','type']</code> 
+   * the value is written in raw format
    * @throws Exception
    */
   @Test
   public void assertTopLevelUnions() throws Exception {
     WebPage page = webPageStore.newPersistent();
-
+    
     // Write webpage data
-    page.setUrl(new Utf8("http://example.com"));
+    page.setUrl((CharSequence) new Utf8("http://example.com"));
     byte[] contentBytes = "example content in example.com".getBytes();
     ByteBuffer buff = ByteBuffer.wrap(contentBytes);
     page.setContent(buff);
@@ -172,7 +174,7 @@ public class TestHBaseStore extends DataStoreTestBase {
     webPageStore.flush() ;
     
     // Read directly from HBase
-    HTable table = new HTable("WebPage");
+    HTable table = new HTable(conf,"WebPage");
     Get get = new Get(Bytes.toBytes("com.example/http"));
     org.apache.hadoop.hbase.client.Result result = table.get(get);
 
@@ -180,11 +182,14 @@ public class TestHBaseStore extends DataStoreTestBase {
     
     assertNotNull(bytesRead) ;
     assertTrue(Arrays.equals(bytesRead, contentBytes));
+    table.close();
   }
   
   /**
-   * Checks that when writing a top level union <code>['null','type']</code> with the option <code>RAW_ROOT_FIELDS_OPTION=true</code>
-   * the column is not created, and when <code>RAW_ROOT_FIELDS_OPTION=false</code> the <code>null</code> value is serialized
+   * Checks that when writing a top level union <code>['null','type']</code> 
+   * with the option <code>RAW_ROOT_FIELDS_OPTION=true</code>
+   * the column is not created, and when <code>RAW_ROOT_FIELDS_OPTION=false</code> 
+   * the <code>null</code> value is serialized
    * with Avro.
    * @throws Exception
    */
@@ -193,17 +198,17 @@ public class TestHBaseStore extends DataStoreTestBase {
     WebPage page = webPageStore.newPersistent();
     
     // Write webpage data
-    page.setUrl(new Utf8("http://example.com"));
+    page.setUrl((CharSequence) new Utf8("http://example.com"));
     page.setContent(null);     // This won't change internal field status to dirty, so
     page.setDirty("content") ; // need to change it manually
     webPageStore.put("com.example/http", page);
     webPageStore.flush() ;
     
     // Read directly from HBase
-    HTable table = new HTable("WebPage");
+    HTable table = new HTable(conf,"WebPage");
     Get get = new Get(Bytes.toBytes("com.example/http"));
     org.apache.hadoop.hbase.client.Result result = table.get(get);
-        
+    table.close();
     byte[] contentBytes = result.getValue(Bytes.toBytes("content"), null);
 
     assertNull(webPageStore.get("com.example/http", new String[]{"content"})) ;
@@ -212,7 +217,7 @@ public class TestHBaseStore extends DataStoreTestBase {
   
   @Override
   public void assertPutMap() throws IOException {
-    HTable table = new HTable("WebPage");
+    HTable table = new HTable(conf,"WebPage");
     Get get = new Get(Bytes.toBytes("com.example/http"));
     org.apache.hadoop.hbase.client.Result result = table.get(get);
     

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/test/java/org/apache/gora/hbase/util/HBaseClusterSingleton.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/test/java/org/apache/gora/hbase/util/HBaseClusterSingleton.java b/gora-hbase/src/test/java/org/apache/gora/hbase/util/HBaseClusterSingleton.java
index fe1ae0e..d9888be 100644
--- a/gora-hbase/src/test/java/org/apache/gora/hbase/util/HBaseClusterSingleton.java
+++ b/gora-hbase/src/test/java/org/apache/gora/hbase/util/HBaseClusterSingleton.java
@@ -85,6 +85,9 @@ public final class HBaseClusterSingleton {
 
     htu.getConfiguration().setBoolean("dfs.support.append", true);
     htu.getConfiguration().setInt("zookeeper.session.timeout", 20000);
+    //htu.getConfiguration().set("hbase.zookeeper.quorum", "localhost");
+    //htu.getConfiguration().setInt("hbase.zookeeper.property.clientPort", 2181);
+    
     try {
       LOG.info("Start HBase mini cluster.");
       hbaseCluster = htu.startMiniCluster(numServers);

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java b/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java
index 138015a..1fcbc69 100644
--- a/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java
+++ b/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java
@@ -20,6 +20,7 @@ package org.apache.gora.hbase.util;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
@@ -28,53 +29,57 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.avro.util.Utf8;
+
 import org.apache.gora.examples.generated.Employee;
 import org.apache.gora.examples.generated.Metadata;
+
 import static org.junit.Assert.assertEquals;
+
 import org.junit.Test;
 
 public class TestHBaseByteInterface {
 
-  private static final Random RANDOM = new Random();
+  private static final Random RANDOM = new Random(0);
 
   @Test
   public void testEncodingDecoding() throws Exception {
     for (int i=0; i < 1000; i++) {
     
       //employer
-      Utf8 name = new Utf8("john");
+      CharSequence name = (CharSequence) new Utf8("john");
       long dateOfBirth = System.currentTimeMillis();
       int salary = 1337;
-      Utf8 ssn = new Utf8(String.valueOf(RANDOM.nextLong()));
+      CharSequence ssn = (CharSequence) new Utf8(String.valueOf(RANDOM.nextLong()));
       
-      Employee e = new Employee();
+      Employee e = Employee.newBuilder().build();
       e.setName(name);
       e.setDateOfBirth(dateOfBirth);
       e.setSalary(salary);
       e.setSsn(ssn);
       
-      byte[] employerBytes = HBaseByteInterface.toBytes(e, Employee._SCHEMA);
-      Employee e2 = (Employee) HBaseByteInterface.fromBytes(Employee._SCHEMA, 
+      byte[] employerBytes = HBaseByteInterface.toBytes(e, Employee.SCHEMA$);
+      Employee e2 = (Employee) HBaseByteInterface.fromBytes(Employee.SCHEMA$, 
           employerBytes);
       
       assertEquals(name, e2.getName());
-      assertEquals(dateOfBirth, e2.getDateOfBirth());
-      assertEquals(salary, e2.getSalary());
+      assertEquals(dateOfBirth, e2.getDateOfBirth().longValue());
+      assertEquals(salary, e2.getSalary().intValue());
       assertEquals(ssn, e2.getSsn());
       
       
       //metadata
-      Utf8 key = new Utf8("theKey");
-      Utf8 value = new Utf8("theValue " + RANDOM.nextLong());
-      
-      Metadata m = new Metadata();
-      m.putToData(key, value);
+      CharSequence key = (CharSequence) new Utf8("theKey");
+      CharSequence value = (CharSequence) new Utf8("theValue " + RANDOM.nextLong());
+      HashMap<CharSequence, CharSequence> data = new HashMap<CharSequence, CharSequence>();
+      data.put(key, value);
+      Metadata m = Metadata.newBuilder().build();
+      m.setData(data);
       
-      byte[] datumBytes = HBaseByteInterface.toBytes(m, Metadata._SCHEMA);
-      Metadata m2 = (Metadata) HBaseByteInterface.fromBytes(Metadata._SCHEMA, 
+      byte[] datumBytes = HBaseByteInterface.toBytes(m, Metadata.SCHEMA$);
+      Metadata m2 = (Metadata) HBaseByteInterface.fromBytes(Metadata.SCHEMA$, 
           datumBytes);
       
-      assertEquals(value, m2.getFromData(key));
+      assertEquals(value, m2.getData().get(key));
     }
   }
   

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-solr/pom.xml
----------------------------------------------------------------------
diff --git a/gora-solr/pom.xml b/gora-solr/pom.xml
index 3ad5307..56eaab9 100644
--- a/gora-solr/pom.xml
+++ b/gora-solr/pom.xml
@@ -142,6 +142,12 @@
     <dependency>
       <groupId>org.apache.solr</groupId>
       <artifactId>solr-core</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.restlet.jee</groupId>
+          <artifactId>org.restlet.ext.servlet</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.solr</groupId>
@@ -188,17 +194,30 @@
     <dependency>
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-server</artifactId>
-      <scope>compile</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.eclipse.jetty.orbit</groupId>
+          <artifactId>javax.servlet</artifactId>
+        </exclusion>
+      </exclusions>
+      <scope>runtime</scope>
     </dependency>
     <dependency>
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-util</artifactId>
-      <scope>compile</scope>
+      <scope>runtime</scope>
     </dependency>
     <dependency>
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-webapp</artifactId>
-      <scope>compile</scope>
+      <scope>runtime</scope>
+    </dependency>
+    
+    <!-- ADDED TO AVOID PROBLEMS WITH JAVAX -->
+    <dependency>
+      <groupId>javax</groupId>
+      <artifactId>javaee-api</artifactId>
+      <version>7.0</version>
     </dependency>
 
     <!-- Testing Dependencies -->
@@ -210,6 +229,12 @@
     <dependency>
       <groupId>org.apache.solr</groupId>
       <artifactId>solr-test-framework</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>jetty-servlet</artifactId>
+        </exclusion>
+      </exclusions>
       <scope>test</scope>
     </dependency>
     <dependency>