You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by rm...@apache.org on 2014/04/12 21:21:56 UTC

svn commit: r1586888 [8/10] - in /gora/trunk: ./ bin/ gora-accumulo/ gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/ gora-accumulo/src/main/java/org/apache/gora/accumulo/query/ gora-accumulo/src/main/java/org/apache/gora/accumulo/store/ ...

Modified: gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java (original)
+++ gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java Sat Apr 12 19:21:53 2014
@@ -42,7 +42,7 @@ import static org.junit.Assert.assertFal
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNull;
 
-import org.apache.avro.generic.GenericArray;
+import org.apache.avro.Schema.Field;
 import org.apache.avro.util.Utf8;
 import org.apache.gora.examples.WebPageDataCreator;
 import org.apache.gora.examples.generated.Employee;
@@ -53,9 +53,9 @@ import org.apache.gora.persistency.impl.
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
+import org.apache.gora.util.AvroUtils;
 import org.apache.gora.util.ByteUtils;
 import org.apache.gora.util.StringUtils;
-import org.junit.Test;
 
 /**
  * Test utilities for DataStores. This utility class provides everything
@@ -89,7 +89,7 @@ public class DataStoreTestUtil {
   public static <K> Employee createEmployee(
       DataStore<K, Employee> dataStore) throws IOException, Exception {
 
-    Employee employee = dataStore.newPersistent();
+    Employee employee = Employee.newBuilder().build();
     employee.setName(new Utf8("Random Joe"));
     employee.setDateOfBirth( System.currentTimeMillis() - 20L *  YEAR_IN_MS );
     employee.setSalary(100000);
@@ -97,12 +97,22 @@ public class DataStoreTestUtil {
     return employee;
   }
 
-  public static <K> Employee createBoss(
-      DataStore<K, Employee> dataStore) throws IOException, Exception {
+  private static <K> WebPage createWebPage(DataStore<K, Employee> dataStore) {
+    WebPage webpage = WebPage.newBuilder().build();
+    webpage.setUrl(new Utf8("url.."));
+    webpage.setContent(ByteBuffer.wrap("test content".getBytes()));
+    webpage.setParsedContent(new ArrayList<CharSequence>());
+    Metadata metadata = Metadata.newBuilder().build();
+    webpage.setMetadata(metadata);
+    return webpage;
+  }
 
-    Employee employee = dataStore.newPersistent();
+  public static <K> Employee createBoss(DataStore<K, Employee> dataStore)
+      throws IOException, Exception {
+
+    Employee employee = Employee.newBuilder().build();
     employee.setName(new Utf8("Random boss"));
-    employee.setDateOfBirth( System.currentTimeMillis() - 22L *  YEAR_IN_MS );
+    employee.setDateOfBirth(System.currentTimeMillis() - 22L * YEAR_IN_MS);
     employee.setSalary(1000000);
     employee.setSsn(new Utf8("202020202020"));
     return employee;
@@ -159,25 +169,23 @@ public class DataStoreTestUtil {
     dataStore.put(ssn, employee);
     dataStore.flush();
 
-    Employee after = dataStore.get(ssn, Employee._ALL_FIELDS);
+    Employee after = dataStore.get(ssn, AvroUtils.getSchemaFieldNames(Employee.SCHEMA$));
 
-    assertEquals(employee, after);
+    assertEqualEmployeeObjects(employee, after);
   }
 
-
   public static void testGetEmployeeRecursive(DataStore<String, Employee> dataStore)
     throws IOException, Exception {
 
     Employee employee = DataStoreTestUtil.createEmployee(dataStore);
     Employee boss = DataStoreTestUtil.createBoss(dataStore);
-    employee.setBoss(boss) ;
+    employee.setBoss(boss);
     
     String ssn = employee.getSsn().toString();
     dataStore.put(ssn, employee);
     dataStore.flush();
-    Employee after = dataStore.get(ssn, Employee._ALL_FIELDS);
-    assertEquals(employee, after);
-    assertEquals(boss, after.getBoss()) ;
+    Employee after = dataStore.get(ssn, AvroUtils.getSchemaFieldNames(Employee.SCHEMA$));
+    assertEqualEmployeeObjects(employee, after);
   }
 
   public static void testGetEmployeeDoubleRecursive(DataStore<String, Employee> dataStore)
@@ -193,10 +201,8 @@ public class DataStoreTestUtil {
       String ssn = employee.getSsn().toString();
       dataStore.put(ssn, employee);
       dataStore.flush();
-      Employee after = dataStore.get(ssn, Employee._ALL_FIELDS);
-      assertEquals(employee, after);
-      assertEquals(boss, after.getBoss()) ;
-      assertEquals(uberBoss, ((Employee)after.getBoss()).getBoss()) ;
+      Employee after = dataStore.get(ssn, AvroUtils.getSchemaFieldNames(Employee.SCHEMA$));
+      assertEqualEmployeeObjects(employee, after);
     }
   
   public static void testGetEmployeeNested(DataStore<String, Employee> dataStore)
@@ -207,7 +213,8 @@ public class DataStoreTestUtil {
     
     webpage.setUrl(new Utf8("url..")) ;
     webpage.setContent(ByteBuffer.wrap("test content".getBytes())) ;
-    Metadata metadata = new BeanFactoryImpl<String,Metadata>(String.class,Metadata.class).newPersistent() ;
+    webpage.setParsedContent(new ArrayList<CharSequence>());
+    Metadata metadata = new BeanFactoryImpl<String,Metadata>(String.class,Metadata.class).newPersistent();
     webpage.setMetadata(metadata) ;
     employee.setWebpage(webpage) ;
     
@@ -215,9 +222,9 @@ public class DataStoreTestUtil {
    
     dataStore.put(ssn, employee);
     dataStore.flush();
-    Employee after = dataStore.get(ssn, Employee._ALL_FIELDS);
-    assertEquals(employee, after);
-    assertEquals(webpage, after.getWebpage()) ;
+    Employee after = dataStore.get(ssn, AvroUtils.getSchemaFieldNames(Employee.SCHEMA$));
+    assertEqualEmployeeObjects(employee, after);
+    assertEqualWebPageObjects(webpage, after.getWebpage());
   }
   
   public static void testGetEmployee3UnionField(DataStore<String, Employee> dataStore)
@@ -225,12 +232,12 @@ public class DataStoreTestUtil {
 
     Employee employee = DataStoreTestUtil.createEmployee(dataStore);
     employee.setBoss(new Utf8("Real boss")) ;
-    
+
     String ssn = employee.getSsn().toString();
     dataStore.put(ssn, employee);
     dataStore.flush();
-    Employee after = dataStore.get(ssn, Employee._ALL_FIELDS);
-    assertEquals(employee, after);
+    Employee after = dataStore.get(ssn, AvroUtils.getSchemaFieldNames(Employee.SCHEMA$));
+    assertEqualEmployeeObjects(employee, after);
     assertEquals("Real boss", ((Utf8)after.getBoss()).toString()) ;
   }
   
@@ -243,32 +250,139 @@ public class DataStoreTestUtil {
   public static void testGetEmployeeWithFields(DataStore<String, Employee> dataStore)
     throws IOException, Exception {
     Employee employee = DataStoreTestUtil.createEmployee(dataStore);
+    WebPage webpage = createWebPage(dataStore);
+    employee.setWebpage(webpage);
+    Employee boss = createBoss(dataStore);
+    employee.setBoss(boss);
     String ssn = employee.getSsn().toString();
     dataStore.put(ssn, employee);
     dataStore.flush();
 
-    // XXX See GORA-216: special case until later reviewed.
-    // Like in K-V stores, if retrieved column does not exists ([webpage] case),
-    // get() must return 'null'.
-    // We prepare an actual weird synthetic test.
-    
-    // String[] fields = employee.getFields();
-    String[] fields = {"name","dateOfBirth","ssn","salary"} ;
-    
+    String[] fields = AvroUtils.getPersistentFieldNames(employee);
     for(Set<String> subset : StringUtils.powerset(fields)) {
       if(subset.isEmpty())
         continue;
       Employee after = dataStore.get(ssn, subset.toArray(new String[subset.size()]));
-      Employee expected = new Employee();
+      Employee expected = Employee.newBuilder().build();
       for(String field:subset) {
-        int index = expected.getFieldIndex(field);
+        int index = expected.getSchema().getField(field).pos();
         expected.put(index, employee.get(index));
       }
 
-      assertEquals(expected, after);        
+      assertEqualEmployeeObjects(expected, after);
+    }
+  }
+  
+  /**
+   * Simple function which iterates through a before (put) and after (get) object
+   * in an attempt to verify if the same field's and values have been obtained.
+   * Within the original employee object we iterate from 1 instead of 0 due to the 
+   * removal of the '__g__' field at position 0 when we put objects into the datastore. 
+   * This field is used to identify whether fields within the object, and 
+   * consequently the object itself, are/is dirty however this field is not 
+   * required when persisting the object.
+   * We explicitly get values from each field as this makes it easier to debug 
+   * if tests go wrong.
+   * @param employee
+   * @param after
+   */
+  private static void assertEqualEmployeeObjects(Employee employee, Employee after) {
+    //for (int i = 1; i < employee.SCHEMA$.getFields().size(); i++) {
+    //  for (int j = 1; j < after.SCHEMA$.getFields().size(); j++) {
+    //    assertEquals(employee.SCHEMA$.getFields().get(i), after.SCHEMA$.getFields().get(j));
+    //  }
+    //}
+    //check name field
+    CharSequence beforeName = employee.getName();
+    CharSequence afterName = after.getName();
+    assertEquals(beforeName, afterName);
+    //check dateOfBirth field
+    Long beforeDOB = employee.getDateOfBirth();
+    Long afterDOB = after.getDateOfBirth();
+    assertEquals(beforeDOB, afterDOB);
+    //check ssn field
+    CharSequence beforeSsn = employee.getSsn();
+    CharSequence afterSsn = after.getSsn();
+    assertEquals(beforeSsn, afterSsn);
+    //check salary field
+    Integer beforeSalary = employee.getSalary();
+    Integer afterSalary = after.getSalary();
+    assertEquals(beforeSalary, afterSalary);
+    //check boss field
+    if (employee.getBoss() != null) {
+      if (employee.getBoss() instanceof Utf8) {
+        String beforeBoss = employee.getBoss().toString();
+        String afterBoss = after.getBoss().toString();
+        assertEquals("Boss String field values in UNION should be the same",
+            beforeBoss, afterBoss);
+      } else {
+        Employee beforeBoss = (Employee) employee.getBoss();
+        Employee afterBoss = (Employee) after.getBoss();
+        assertEqualEmployeeObjects(beforeBoss, afterBoss);
+      }
+    }
+    //check webpage field
+    if (employee.getWebpage() != null) {
+      WebPage beforeWebPage = employee.getWebpage();
+      WebPage afterWebPage = after.getWebpage();
+      assertEqualWebPageObjects(beforeWebPage, afterWebPage);
     }
   }
 
+  /**
+   * Mimics {@link org.apache.gora.store.DataStoreTestUtil#assertEqualEmployeeObjects(Employee, Employee)}
+   * in that we pick our way through fields within before and after 
+   * {@link org.apache.gora.examples.generated.WebPage} objects comparing field values.
+   * @param beforeWebPage
+   * @param afterWebPage
+   */
+  private static void assertEqualWebPageObjects(WebPage beforeWebPage, WebPage afterWebPage) {
+    //check url field
+    CharSequence beforeUrl = beforeWebPage.getUrl();
+    CharSequence afterUrl = afterWebPage.getUrl();
+    assertEquals(beforeUrl, afterUrl);
+    //check content field
+    ByteBuffer beforeContent = beforeWebPage.getContent();
+    ByteBuffer afterContent = afterWebPage.getContent();
+    assertEquals(beforeContent, afterContent);
+    //check parsedContent field
+    List<CharSequence> beforeParsedContent = 
+        (List<CharSequence>) beforeWebPage.getParsedContent();
+    List<CharSequence> afterParsedContent = 
+        (List<CharSequence>) afterWebPage.getParsedContent();
+    assertEquals(beforeParsedContent, afterParsedContent);
+    //check outlinks field
+    Map<CharSequence, CharSequence> beforeOutlinks = 
+        (Map<java.lang.CharSequence,java.lang.CharSequence>) beforeWebPage.getOutlinks();
+    Map<CharSequence, CharSequence> afterOutlinks = 
+        (Map<java.lang.CharSequence,java.lang.CharSequence>) afterWebPage.getOutlinks();
+    assertEquals(beforeOutlinks, afterOutlinks);
+    //check metadata field
+    if (beforeWebPage.get(5) != null) {
+      Metadata beforeMetadata = beforeWebPage.getMetadata();
+      Metadata afterMetadata = afterWebPage.getMetadata();
+      assertEqualMetadataObjects(beforeMetadata, afterMetadata);
+    }
+  }
+
+  /**
+   * Mimics {@link org.apache.gora.store.DataStoreTestUtil#assertEqualEmployeeObjects(Employee, Employee)}
+   * in that we pick our way through fields within before and after 
+   * {@link org.apache.gora.examples.generated.Metadata} objects comparing field values.
+   * @param beforeMetadata
+   * @param afterMetadata
+   */
+  private static void assertEqualMetadataObjects(Metadata beforeMetadata, Metadata afterMetadata) {
+    //check version field
+    int beforeVersion = beforeMetadata.getVersion();
+    int afterVersion = afterMetadata.getVersion();
+    assertEquals(beforeVersion, afterVersion);
+    //check data field
+    Map<CharSequence, CharSequence> beforeData = beforeMetadata.getData();
+    Map<CharSequence, CharSequence> afterData =  afterMetadata.getData();
+    assertEquals(beforeData, afterData);
+  }
+
   public static Employee testPutEmployee(DataStore<String, Employee> dataStore)
   throws IOException, Exception {
     dataStore.createSchema();
@@ -306,6 +420,16 @@ public class DataStoreTestUtil {
     assertNull(employee);
   }
 
+  /**
+   * Here we create 5 {@link org.apache.gora.examples.generated.Employee} objects
+   * before populating fields with data and flushing them to the datastore.
+   * We then update the 1st of the {@link org.apache.gora.examples.generated.Employee}'s
+   * with more data and flush this data. Assertions are then made over the updated
+   * {@link org.apache.gora.examples.generated.Employee} object.
+   * @param dataStore
+   * @throws IOException
+   * @throws Exception
+   */
   public static void testUpdateEmployee(DataStore<String, Employee> dataStore)
   throws IOException, Exception {
     dataStore.createSchema();
@@ -313,7 +437,7 @@ public class DataStoreTestUtil {
     long now = System.currentTimeMillis();
 
     for (int i = 0; i < 5; i++) {
-      Employee employee = dataStore.newPersistent();
+      Employee employee = Employee.newBuilder().build();
       employee.setName(new Utf8("John Doe " + i));
       employee.setDateOfBirth(now - 20L *  YEAR_IN_MS);
       employee.setSalary(100000);
@@ -324,9 +448,9 @@ public class DataStoreTestUtil {
     dataStore.flush();
 
     for (int i = 0; i < 1; i++) {
-      Employee employee = dataStore.newPersistent();
+      Employee employee = Employee.newBuilder().build();
       employee.setName(new Utf8("John Doe " + (i + 5)));
-      employee.setDateOfBirth(now - 18L *  YEAR_IN_MS);
+      employee.setDateOfBirth(now - 18L * YEAR_IN_MS);
       employee.setSalary(120000);
       employee.setSsn(new Utf8(Long.toString(ssn + i)));
       dataStore.put(employee.getSsn().toString(), employee);
@@ -337,33 +461,42 @@ public class DataStoreTestUtil {
     for (int i = 0; i < 1; i++) {
       String key = Long.toString(ssn + i);
       Employee employee = dataStore.get(key);
-      assertEquals(now - 18L * YEAR_IN_MS, employee.getDateOfBirth());
+      assertEquals(now - 18L * YEAR_IN_MS, employee.getDateOfBirth().longValue()); 
       assertEquals("John Doe " + (i + 5), employee.getName().toString());
-      assertEquals(120000, employee.getSalary());
+      assertEquals(120000, employee.getSalary().intValue()); 
     }
   }
 
-  public static void testUpdateWebPage(DataStore<String, WebPage> dataStore)
+  /**
+   * Here we create 7 {@link org.apache.gora.examples.generated.WebPage}
+   * objects and populate field data before flushing the objects to the 
+   * datastore. We then get the objects, adding data to the 'content' and
+   * 'parsedContent' fields before clearing the 'outlinks' field and 
+   * re-populating it. This data is then flushed to the datastore. 
+   * Finally we get the {@link org.apache.gora.examples.generated.WebPage}
+   * objects and make various assertions over verious fields. This tests 
+   * that we can update fields and that data can be written and read correctly.
+   * @param dataStore
+   * @throws IOException
+   * @throws Exception
+   */
+  public static void testUpdateWebPagePutToArray(DataStore<String, WebPage> dataStore)
   throws IOException, Exception {
     dataStore.createSchema();
 
     String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
-        "http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g"};
+        "http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g" };
     String content = "content";
     String parsedContent = "parsedContent";
-    String anchor = "anchor";
 
     int parsedContentCount = 0;
 
 
     for (int i = 0; i < urls.length; i++) {
-      WebPage webPage = dataStore.newPersistent();
+      WebPage webPage = WebPage.newBuilder().build();
       webPage.setUrl(new Utf8(urls[i]));
       for (parsedContentCount = 0; parsedContentCount < 5; parsedContentCount++) {
-        webPage.addToParsedContent(new Utf8(parsedContent + i + "," + parsedContentCount));
-      }
-      for (int j = 0; j < urls.length; j += 2) {
-        webPage.putToOutlinks(new Utf8(anchor + j), new Utf8(urls[j]));
+        webPage.getParsedContent().add(new Utf8(parsedContent + i + "," + parsedContentCount));
       }
       dataStore.put(webPage.getUrl().toString(), webPage);
     }
@@ -374,15 +507,7 @@ public class DataStoreTestUtil {
       WebPage webPage = dataStore.get(urls[i]);
       webPage.setContent(ByteBuffer.wrap(ByteUtils.toBytes(content + i)));
       for (parsedContentCount = 5; parsedContentCount < 10; parsedContentCount++) {
-        webPage.addToParsedContent(new Utf8(parsedContent + i + "," + parsedContentCount));
-      }
-      webPage.getOutlinks().clear();
-      for (int j = 1; j < urls.length; j += 2) {
-        webPage.putToOutlinks(new Utf8(anchor + j), new Utf8(urls[j]));
-      }
-      //test for double put of same entries
-      for (int j = 1; j < urls.length; j += 2) {
-        webPage.putToOutlinks(new Utf8(anchor + j), new Utf8(urls[j]));
+        webPage.getParsedContent().add(new Utf8(parsedContent + i + "," + parsedContentCount));
       }
       dataStore.put(webPage.getUrl().toString(), webPage);
     }
@@ -394,24 +519,90 @@ public class DataStoreTestUtil {
       assertEquals(content + i, ByteUtils.toString( toByteArray(webPage.getContent()) ));
       assertEquals(10, webPage.getParsedContent().size());
       int j = 0;
-      for (Utf8 pc : webPage.getParsedContent()) {
+      for (CharSequence pc : webPage.getParsedContent()) {
         assertEquals(parsedContent + i + "," + j, pc.toString());
         j++;
       }
+    }
+  }
+
+  public static void testUpdateWebPagePutToNotNullableMap(DataStore<String, WebPage> dataStore)
+  throws IOException, Exception {
+    dataStore.createSchema();
+
+    String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
+        "http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g" };
+    String anchor = "anchor";
+
+    // putting evens
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = WebPage.newBuilder().build();
+      webPage.setUrl(new Utf8(urls[i]));
+      for (int j = 0; j < urls.length; j += 2) {
+        webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
+      }
+      dataStore.put(webPage.getUrl().toString(), webPage);
+    }
+    dataStore.flush();
+
+    // putting odds
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = dataStore.get(urls[i]);
+      webPage.getOutlinks().clear();
+      for (int j = 1; j < urls.length; j += 2) {
+        webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
+      }
+      // test for double put of same entries
+      for (int j = 1; j < urls.length; j += 2) {
+        webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
+      }
+      dataStore.put(webPage.getUrl().toString(), webPage);
+    }
+    dataStore.flush();
+
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = dataStore.get(urls[i]);
       int count = 0;
-      for (j = 1; j < urls.length; j += 2) {
-        Utf8 link = webPage.getOutlinks().get(new Utf8(anchor + j));
+      for (int j = 1; j < urls.length; j += 2) {
+        CharSequence link = webPage.getOutlinks().get(new Utf8(anchor + j));
         assertNotNull(link);
         assertEquals(urls[j], link.toString());
         count++;
       }
       assertEquals(count, webPage.getOutlinks().size());
     }
+  }
+
+  public static void testUpdateWebPagePutToNullableMap(DataStore<String, WebPage> dataStore)
+  throws IOException, Exception {
+    dataStore.createSchema();
+
+    String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
+        "http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g" };
+    String header = "header";
+    String[] headers = { "firstHeader", "secondHeader", "thirdHeader",
+        "fourthHeader", "fifthHeader", "sixthHeader" };
+
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = WebPage.newBuilder().build();
+      webPage.setUrl(new Utf8(urls[i]));
+      //test put for nullable map field 
+      // we put data to the 'headers' field which is a Map with default value of 'null'
+      webPage.setHeaders(new HashMap<CharSequence, CharSequence>());
+      for (int j = 0; j < headers.length; j += 2) {
+        webPage.getHeaders().put(new Utf8(header + j), new Utf8(headers[j]));
+      }
+      dataStore.put(webPage.getUrl().toString(), webPage);
+    }
+
+    dataStore.flush();
 
     for (int i = 0; i < urls.length; i++) {
       WebPage webPage = dataStore.get(urls[i]);
-      for (int j = 0; j < urls.length; j += 2) {
-        webPage.putToOutlinks(new Utf8(anchor + j), new Utf8(urls[j]));
+      //webPage.getHeaders().clear(); //TODO clear method does not work
+      webPage.setHeaders(new HashMap<CharSequence, CharSequence>());
+      for (int j = 1; j < headers.length; j += 2) {
+        webPage.getHeaders().put(new Utf8(header + j), new Utf8(headers[j]));
       }
       dataStore.put(webPage.getUrl().toString(), webPage);
     }
@@ -421,12 +612,93 @@ public class DataStoreTestUtil {
     for (int i = 0; i < urls.length; i++) {
       WebPage webPage = dataStore.get(urls[i]);
       int count = 0;
+      for (int j = 1; j < headers.length; j += 2) {
+        CharSequence headerSample = webPage.getHeaders().get(new Utf8(header + j));
+        assertNotNull(headerSample);
+        assertEquals(headers[j], headerSample.toString());
+        count++;
+      }
+      assertEquals(count, webPage.getHeaders().size());
+    }
+  }
+
+  public static void testUpdateWebPageRemoveMapEntry(DataStore<String, WebPage> dataStore)
+  throws IOException, Exception {
+    dataStore.createSchema();
+
+    String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
+        "http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g" };
+    String anchor = "anchor";
+
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = WebPage.newBuilder().build();
+      webPage.setUrl(new Utf8(urls[i]));
       for (int j = 0; j < urls.length; j++) {
-        Utf8 link = webPage.getOutlinks().get(new Utf8(anchor + j));
-        assertNotNull(link);
-        assertEquals(urls[j], link.toString());
+        webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
+      }
+      dataStore.put(webPage.getUrl().toString(), webPage);
+    }
+
+    dataStore.flush();
+
+    // map entry removal test
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = dataStore.get(urls[i]);
+      for (int j = 1; j < urls.length; j += 2) {
+        webPage.getOutlinks().remove(new Utf8(anchor + j));
+      }
+      dataStore.put(webPage.getUrl().toString(), webPage);
+    }
+
+    dataStore.flush();
+
+    for (int i = 0; i < urls.length; i++) {
+      int count = 0;
+      WebPage webPage = dataStore.get(urls[i]);
+      for (int j = 1; j < urls.length; j += 2) {
+        CharSequence link = webPage.getOutlinks().get(new Utf8(anchor + j));
+        assertNull(link);
+        //assertEquals(urls[j], link.toString());
         count++;
       }
+      assertEquals(urls.length - count, webPage.getOutlinks().size());
+    }
+  }
+
+  public static void testUpdateWebPageRemoveField(DataStore<String, WebPage> dataStore)
+  throws IOException, Exception {
+    dataStore.createSchema();
+
+    String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
+        "http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g" };
+    String header = "header";
+    String[] headers = { "firstHeader", "secondHeader", "thirdHeader",
+        "fourthHeader", "fifthHeader", "sixthHeader" };
+
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = WebPage.newBuilder().build();
+      webPage.setUrl(new Utf8(urls[i]));
+      webPage.setHeaders(new HashMap<CharSequence, CharSequence>());
+      for (int j = 0; j < headers.length; j++) {
+        webPage.getHeaders().put(new Utf8(header + j), new Utf8(headers[j]));
+      }
+      dataStore.put(webPage.getUrl().toString(), webPage);
+    }
+
+    dataStore.flush();
+
+    // nullable map field removal test
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = dataStore.get(urls[i]);
+      webPage.setHeaders(null);
+      dataStore.put(webPage.getUrl().toString(), webPage);
+    }
+
+    dataStore.flush();
+
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = dataStore.get(urls[i]);
+      assertNull(webPage.getHeaders());
     }
   }
 
@@ -440,19 +712,20 @@ public class DataStoreTestUtil {
         " actual=" + CONTENTS[i] + " i=" + i
         , Arrays.equals( toByteArray(page.getContent() )
         , CONTENTS[i].getBytes()));
-      GenericArray<Utf8> parsedContent = page.getParsedContent();
+    
+      List<CharSequence> parsedContent = page.getParsedContent();
       assertNotNull(parsedContent);
       assertTrue(parsedContent.size() > 0);
     
       int j=0;
       String[] tokens = CONTENTS[i].split(" ");
-      for(Utf8 token : parsedContent) {
+      for(CharSequence token : parsedContent) {
         assertEquals(tokens[j++], token.toString());
       }
     } else {
       // when page.getContent() is null
       assertTrue(CONTENTS[i] == null) ;
-      GenericArray<Utf8> parsedContent = page.getParsedContent();
+      List<CharSequence> parsedContent = page.getParsedContent();
       assertNotNull(parsedContent);
       assertTrue(parsedContent.size() == 0);
     }
@@ -462,7 +735,7 @@ public class DataStoreTestUtil {
       assertTrue(page.getOutlinks().size() > 0);
       for(int k=0; k<LINKS[i].length; k++) {
         assertEquals(ANCHORS[i][k],
-          page.getFromOutlinks(new Utf8(URLS[LINKS[i][k]])).toString());
+          page.getOutlinks().get(new Utf8(URLS[LINKS[i][k]])).toString());
       }
     } else {
       assertTrue(page.getOutlinks() == null || page.getOutlinks().isEmpty());
@@ -480,7 +753,7 @@ public class DataStoreTestUtil {
   }
 
   public static void testGetWebPage(DataStore<String, WebPage> store) throws IOException, Exception {
-    testGetWebPage(store, WebPage._ALL_FIELDS);
+    testGetWebPage(store, getFields(WebPage.SCHEMA$.getFields()));
   }
 
   public static void testGetWebPageDefaultFields(DataStore<String, WebPage> store)
@@ -507,7 +780,7 @@ public class DataStoreTestUtil {
 
   public static void testQueryWebPageSingleKey(DataStore<String, WebPage> store)
   throws IOException, Exception {
-    testQueryWebPageSingleKey(store, WebPage._ALL_FIELDS);
+    testQueryWebPageSingleKey(store, getFields(WebPage.SCHEMA$.getFields()));
   }
 
   public static void testQueryWebPageSingleKeyDefaultFields(
@@ -714,7 +987,7 @@ public class DataStoreTestUtil {
     WebPageDataCreator.createWebPageData(store);
 
     query = store.newQuery();
-    query.setFields(WebPage._ALL_FIELDS);
+    query.setFields(AvroUtils.getSchemaFieldNames(WebPage.SCHEMA$));
 
     assertNumResults(store.newQuery(), URLS.length);
     store.deleteByQuery(query);
@@ -757,8 +1030,8 @@ public class DataStoreTestUtil {
     WebPageDataCreator.createWebPageData(store);
 
     query = store.newQuery();
-    query.setFields(WebPage.Field.OUTLINKS.getName()
-        , WebPage.Field.PARSED_CONTENT.getName(), WebPage.Field.CONTENT.getName());
+    query.setFields("outlinks"
+        , "parsedContent", "content");
 
     assertNumResults(store.newQuery(), URLS.length);
     store.deleteByQuery(query);
@@ -776,7 +1049,8 @@ public class DataStoreTestUtil {
 
       assertNotNull(page.getUrl());
       assertEquals(page.getUrl().toString(), SORTED_URLS[i]);
-      assertEquals(0, page.getOutlinks().size());
+      assertEquals("Map of Outlinks should have a size of '0' as the deleteByQuery "
+          + "not only removes the data but also the data structure.", 0, page.getOutlinks().size());
       assertEquals(0, page.getParsedContent().size());
       if(page.getContent() != null) {
         System.out.println("url:" + page.getUrl().toString());
@@ -790,7 +1064,7 @@ public class DataStoreTestUtil {
     WebPageDataCreator.createWebPageData(store);
 
     query = store.newQuery();
-    query.setFields(WebPage.Field.URL.getName());
+    query.setFields("url");
     String startKey = SORTED_URLS[NUM_KEYS];
     String endKey = SORTED_URLS[SORTED_URLS.length - NUM_KEYS];
     query.setStartKey(startKey);
@@ -832,10 +1106,10 @@ public class DataStoreTestUtil {
     String url = "http://foo.com/";
 
     store.createSchema();
-    WebPage page = store.newPersistent();
-    Metadata metadata = new Metadata();
+    WebPage page = WebPage.newBuilder().build();
+    Metadata metadata = Metadata.newBuilder().build();
     metadata.setVersion(1);
-    metadata.putToData(new Utf8("foo"), new Utf8("baz"));
+    metadata.getData().put(new Utf8("foo"), new Utf8("baz"));
 
     page.setMetadata(metadata);
     page.setUrl(new Utf8(url));
@@ -846,30 +1120,31 @@ public class DataStoreTestUtil {
     page = store.get(revUrl);
     metadata = page.getMetadata();
     assertNotNull(metadata);
-    assertEquals(1, metadata.getVersion());
+    assertEquals(1, metadata.getVersion().intValue()); 
     assertEquals(new Utf8("baz"), metadata.getData().get(new Utf8("foo")));
   }
 
   public static void testPutArray(DataStore<String, WebPage> store)
           throws IOException, Exception {
     store.createSchema();
-    WebPage page = store.newPersistent();
+    WebPage page = WebPage.newBuilder().build();
 
     String[] tokens = {"example", "content", "in", "example.com"};
-
+    page.setParsedContent(new ArrayList<CharSequence>());
     for(String token: tokens) {
-      page.addToParsedContent(new Utf8(token));
+      page.getParsedContent().add(new Utf8(token));
     }
 
     store.put("com.example/http", page);
     store.close();
+
   }
 
   public static byte[] testPutBytes(DataStore<String, WebPage> store)
           throws IOException, Exception {
 
     store.createSchema();
-    WebPage page = store.newPersistent();
+    WebPage page = WebPage.newBuilder().build();
     page.setUrl(new Utf8("http://example.com"));
     byte[] contentBytes = "example content in example.com".getBytes();
     ByteBuffer buff = ByteBuffer.wrap(contentBytes);
@@ -886,12 +1161,12 @@ public class DataStoreTestUtil {
 
     store.createSchema();
 
-    WebPage page = store.newPersistent();
+    WebPage page = WebPage.newBuilder().build();
 
     page.setUrl(new Utf8("http://example.com"));
-    page.putToOutlinks(new Utf8("http://example2.com"), new Utf8("anchor2"));
-    page.putToOutlinks(new Utf8("http://example3.com"), new Utf8("anchor3"));
-    page.putToOutlinks(new Utf8("http://example3.com"), new Utf8("anchor4"));
+    page.getOutlinks().put(new Utf8("http://example2.com"), new Utf8("anchor2"));
+    page.getOutlinks().put(new Utf8("http://example3.com"), new Utf8("anchor3"));
+    page.getOutlinks().put(new Utf8("http://example3.com"), new Utf8("anchor4"));
     store.put("com.example/http", page);
     store.close();
   }
@@ -905,5 +1180,23 @@ public class DataStoreTestUtil {
     }
     return bytes;
   }
-
+  
+  public static String[] getFields(List<Field> schemaFields) {
+    
+    List<Field> list = new ArrayList<Field>();
+    for (Field field : schemaFields) {
+      if (!Persistent.DIRTY_BYTES_FIELD_NAME.equalsIgnoreCase(field.name())) {
+        list.add(field);
+      }
+    }
+    schemaFields = list;
+    
+    String[] fieldNames = new String[schemaFields.size()];
+    for(int i = 0; i<fieldNames.length; i++ ){
+      fieldNames[i] = schemaFields.get(i).name();
+    }
+    
+    return fieldNames;
+  }
+  
 }

Modified: gora/trunk/gora-core/src/test/java/org/apache/gora/store/TestDataStoreFactory.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/test/java/org/apache/gora/store/TestDataStoreFactory.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-core/src/test/java/org/apache/gora/store/TestDataStoreFactory.java (original)
+++ gora/trunk/gora-core/src/test/java/org/apache/gora/store/TestDataStoreFactory.java Sat Apr 12 19:21:53 2014
@@ -20,9 +20,9 @@ package org.apache.gora.store;
 
 import java.util.Properties;
 
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNotSame;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
 
 import org.apache.gora.avro.store.DataFileAvroStore;
 import org.apache.gora.mock.persistency.MockPersistent;

Modified: gora/trunk/gora-core/src/test/java/org/apache/gora/util/TestIOUtils.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/test/java/org/apache/gora/util/TestIOUtils.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-core/src/test/java/org/apache/gora/util/TestIOUtils.java (original)
+++ gora/trunk/gora-core/src/test/java/org/apache/gora/util/TestIOUtils.java Sat Apr 12 19:21:53 2014
@@ -27,11 +27,10 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.util.Arrays;
 
-import org.apache.avro.ipc.ByteBufferInputStream;
-import org.apache.avro.ipc.ByteBufferOutputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.avro.util.ByteBufferInputStream;
+import org.apache.avro.util.ByteBufferOutputStream;
 import org.apache.gora.mapreduce.GoraMapReduceUtils;
+import org.apache.gora.util.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -39,6 +38,8 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.junit.Test;
 import static org.junit.Assert.assertEquals;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test case for {@link IOUtils} class.

Modified: gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java (original)
+++ gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java Sat Apr 12 19:21:53 2014
@@ -36,7 +36,6 @@ import org.apache.gora.persistency.Persi
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
-import org.apache.gora.store.DataStoreFactory;
 import org.apache.gora.store.ws.impl.WSDataStoreBase;
 import org.apache.gora.util.GoraException;
 import org.slf4j.Logger;
@@ -162,11 +161,11 @@ public class DynamoDBStore<K, T extends 
       LOG.debug("Initializing DynamoDB store");
       getCredentials();
       setWsProvider(wsProvider);
-      preferredSchema = DataStoreFactory.findProperty(properties, this, PREF_SCH_NAME, null);
-      dynamoDBClient = getClient(DataStoreFactory.findProperty(properties, this, CLI_TYP_PROP, null),(AWSCredentials)getConf());
-      dynamoDBClient.setEndpoint(DataStoreFactory.findProperty(properties, this, ENDPOINT_PROP, null));
+      preferredSchema = properties.getProperty(PREF_SCH_NAME);
+      dynamoDBClient = getClient(properties.getProperty(CLI_TYP_PROP),(AWSCredentials)getConf());
+      dynamoDBClient.setEndpoint(properties.getProperty(ENDPOINT_PROP));
       mapping = readMapping();
-      consistency = DataStoreFactory.findProperty(properties, this, CONSISTENCY_READS, null);
+      consistency = properties.getProperty(CONSISTENCY_READS);
       persistentClass = pPersistentClass;
     }
     catch (Exception e) {

Modified: gora/trunk/gora-hbase/pom.xml
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/pom.xml?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-hbase/pom.xml (original)
+++ gora/trunk/gora-hbase/pom.xml Sat Apr 12 19:21:53 2014
@@ -100,6 +100,7 @@
         <dependency>
             <groupId>org.apache.gora</groupId>
             <artifactId>gora-core</artifactId>
+            <scope>compile</scope>
         </dependency>
 
         <dependency>
@@ -108,45 +109,49 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <!-- END of Gora Internal Dependencies -->
 
-        <!-- Hadoop Dependencies -->
         <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase</artifactId>
+            <scope>compile</scope>
         </dependency>
 
         <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase</artifactId>
-            <type>test-jar</type>
-        </dependency>
-
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
+            <groupId>org.apache.avro</groupId>
             <artifactId>avro</artifactId>
+            <scope>compile</scope>
         </dependency>
 
         <!-- Misc Dependencies -->
         <dependency>
             <groupId>org.jdom</groupId>
             <artifactId>jdom</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <scope>compile</scope>
         </dependency>
 
         <!-- Logging Dependencies -->
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
+            <scope>compile</scope>
         </dependency>
 
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-simple</artifactId>
+            <scope>compile</scope>
         </dependency>
 
         <dependency>
             <groupId>log4j</groupId>
             <artifactId>log4j</artifactId>
+            <scope>runtime</scope>
 	        <exclusions>
 	          <exclusion>
                 <groupId>javax.jms</groupId>
@@ -154,18 +159,28 @@
 	          </exclusion>
             </exclusions>
         </dependency>
+        <!-- END of Logging Dependencies -->
 
         <!-- Testing Dependencies -->
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
+            <scope>test</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-test</artifactId>
+            <scope>test</scope>
         </dependency>
-
+        
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <!-- END of Testing Dependencies -->
     </dependencies>
 
 </project>

Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java Sat Apr 12 19:21:53 2014
@@ -21,7 +21,6 @@ package org.apache.gora.hbase.query;
 import java.io.IOException;
 
 import org.apache.gora.hbase.store.HBaseStore;
-import org.apache.gora.persistency.Persistent;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.hadoop.hbase.client.Get;

Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java Sat Apr 12 19:21:53 2014
@@ -23,7 +23,6 @@ import static org.apache.gora.hbase.util
 import java.io.IOException;
 
 import org.apache.gora.hbase.store.HBaseStore;
-import org.apache.gora.persistency.Persistent;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.impl.ResultBase;

Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java Sat Apr 12 19:21:53 2014
@@ -25,7 +25,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -36,21 +35,15 @@ import java.util.Set;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericArray;
 import org.apache.avro.util.Utf8;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.gora.hbase.query.HBaseGetResult;
 import org.apache.gora.hbase.query.HBaseQuery;
 import org.apache.gora.hbase.query.HBaseScannerResult;
 import org.apache.gora.hbase.store.HBaseMapping.HBaseMappingBuilder;
 import org.apache.gora.hbase.util.HBaseByteInterface;
 import org.apache.gora.hbase.util.HBaseFilterUtil;
-import org.apache.gora.persistency.ListGenericArray;
-import org.apache.gora.persistency.State;
-import org.apache.gora.persistency.StateManager;
-import org.apache.gora.persistency.StatefulHashMap;
-import org.apache.gora.persistency.StatefulMap;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
+import org.apache.gora.persistency.impl.DirtyMapWrapper;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
@@ -74,6 +67,8 @@ import org.apache.hadoop.hbase.util.Pair
 import org.jdom.Document;
 import org.jdom.Element;
 import org.jdom.input.SAXBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * DataStore for HBase. Thread safe.
@@ -135,7 +130,7 @@ implements Configurable {
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
-    
+
     // Set scanner caching option
     try {
       this.setScannerCaching(
@@ -146,7 +141,7 @@ implements Configurable {
       LOG.error("Can not load " + SCANNER_CACHING_PROPERTIES_KEY + " from gora.properties. Setting to default value: " + SCANNER_CACHING_PROPERTIES_DEFAULT, e) ;
       this.setScannerCaching(SCANNER_CACHING_PROPERTIES_DEFAULT) ; // Default value if something is wrong
     }
-    
+
     if(autoCreateSchema) {
       createSchema();
     }
@@ -225,126 +220,117 @@ implements Configurable {
   }
 
   /**
-   * {@inheritDoc}
-   * Serializes the Persistent data and saves in HBase.
-   * Topmost fields of the record are persisted in "raw" format (not avro serialized). This behavior happens
-   * in maps and arrays too.
+   * {@inheritDoc} Serializes the Persistent data and saves in HBase. Topmost
+   * fields of the record are persisted in "raw" format (not avro serialized).
+   * This behavior happens in maps and arrays too.
    * 
-   * ["null","type"] type (a.k.a. optional field) is persisted like as if it is ["type"], but the column get
-   * deleted if value==null (so value read after will be null).
+   * ["null","type"] type (a.k.a. optional field) is persisted like as if it is
+   * ["type"], but the column get deleted if value==null (so value read after
+   * will be null).
    * 
-   * @param persistent Record to be persisted in HBase
+   * @param persistent
+   *          Record to be persisted in HBase
    */
-  @SuppressWarnings({ "unchecked", "rawtypes" })
   @Override
   public void put(K key, T persistent) {
-    try{
+    try {
       Schema schema = persistent.getSchema();
-      StateManager stateManager = persistent.getStateManager();
       byte[] keyRaw = toBytes(key);
       Put put = new Put(keyRaw);
       Delete delete = new Delete(keyRaw);
-      boolean hasPuts = false;
-      boolean hasDeletes = false;
-      Iterator<Field> iter = schema.getFields().iterator();
-      for (int i = 0; iter.hasNext(); i++) {
-        Field field = iter.next();
-        if (!stateManager.isDirty(persistent, i)) {
+      List<Field> fields = schema.getFields();
+      for (int i = 1; i < fields.size(); i++) {
+        if (!persistent.isDirty(i)) {
           continue;
         }
-        Type type = field.schema().getType();
+        Field field = fields.get(i);
         Object o = persistent.get(i);
         HBaseColumn hcol = mapping.getColumn(field.name());
         if (hcol == null) {
-          throw new RuntimeException("HBase mapping for field ["+ persistent.getClass().getName() +
-              "#"+ field.name()+"] not found. Wrong gora-hbase-mapping.xml?");
-        }
-        switch(type) {
-          case MAP:
-            if(o instanceof StatefulMap) {
-              StatefulHashMap<Utf8, ?> map = (StatefulHashMap<Utf8, ?>) o;
-              for (Entry<Utf8, State> e : map.states().entrySet()) {
-                Utf8 mapKey = e.getKey();
-                switch (e.getValue()) {
-                  case DIRTY:
-                    byte[] qual = Bytes.toBytes(mapKey.toString());
-                    byte[] val = toBytes(map.get(mapKey), field.schema().getValueType());
-                    // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
-                    if (val == null) { // value == null => must delete the column
-                      delete.deleteColumn(hcol.getFamily(), qual);
-                      hasDeletes = true;
-                    } else {
-                      put.add(hcol.getFamily(), qual, val);
-                      hasPuts = true;
-                    }
-                    break;
-                  case DELETED:
-                    qual = Bytes.toBytes(mapKey.toString());
-                    hasDeletes = true;
-                    delete.deleteColumn(hcol.getFamily(), qual);
-                    break;
-                  default :
-                    break;
-                }
-              }
-            } else {
-              Set<Map.Entry> set = ((Map)o).entrySet();
-              for(Entry entry: set) {
-                byte[] qual = toBytes(entry.getKey());
-                byte[] val = toBytes(entry.getValue(), field.schema().getValueType());
-                // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
-                if (val == null) { // value == null => must delete the column
-                  delete.deleteColumn(hcol.getFamily(), qual);
-                  hasDeletes = true;
-                } else {
-                  put.add(hcol.getFamily(), qual, val);
-                  hasPuts = true;
-                }
-              }
-            }
-            break;
-          case ARRAY:
-            if(o instanceof GenericArray) {
-              GenericArray arr = (GenericArray) o;
-              int j=0;
-              for(Object item : arr) {
-                byte[] val = toBytes(item, field.schema().getElementType());
-                // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
-                if (val == null) { // value == null => must delete the column
-                  delete.deleteColumn(hcol.getFamily(), Bytes.toBytes(j++));
-                  hasDeletes = true;
-                } else {
-                  put.add(hcol.getFamily(), Bytes.toBytes(j++), val);
-                  hasPuts = true;
-                }
-              }
-            }
-            break;
-          default:
-            // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
-            byte[] serializedBytes = toBytes(o, field.schema()) ;
-            if (serializedBytes == null) { // value == null => must delete the column
-              delete.deleteColumn(hcol.getFamily(), hcol.getQualifier());
-              hasDeletes = true;
-            } else {
-              put.add(hcol.getFamily(), hcol.getQualifier(), serializedBytes);
-              hasPuts = true;
-            }
-            break;
+          throw new RuntimeException("HBase mapping for field ["
+              + persistent.getClass().getName() + "#" + field.name()
+              + "] not found. Wrong gora-hbase-mapping.xml?");
         }
+        addPutsAndDeletes(put, delete, o, field.schema().getType(),
+            field.schema(), hcol, hcol.getQualifier());
       }
-      if (hasPuts) {
+      if (put.size() > 0) {
         table.put(put);
       }
-      if (hasDeletes) {
+      if (delete.size() > 0) {
         table.delete(delete);
+        table.delete(delete);
+        table.delete(delete); // HBase sometimes does not delete arbitrarily
       }
-    } catch(IOException ex2){
+    } catch (IOException ex2) {
       LOG.error(ex2.getMessage());
       LOG.error(ex2.getStackTrace().toString());
     }
   }
 
+  private void addPutsAndDeletes(Put put, Delete delete, Object o, Type type,
+      Schema schema, HBaseColumn hcol, byte[] qualifier) throws IOException {
+    switch (type) {
+    case UNION:
+      if (isNullable(schema) && o == null) {
+        if (qualifier == null) {
+          delete.deleteFamily(hcol.getFamily());
+        } else {
+          delete.deleteColumn(hcol.getFamily(), qualifier);
+        }
+      } else {
+//        int index = GenericData.get().resolveUnion(schema, o);
+        int index = getResolvedUnionIndex(schema);
+        if (index > 1) {  //if more than 2 type in union, serialize directly for now
+          byte[] serializedBytes = toBytes(o, schema);
+          put.add(hcol.getFamily(), qualifier, serializedBytes);
+        } else {
+          Schema resolvedSchema = schema.getTypes().get(index);
+          addPutsAndDeletes(put, delete, o, resolvedSchema.getType(),
+              resolvedSchema, hcol, qualifier);
+        }
+      }
+      break;
+    case MAP:
+      // if it's a map that has been modified, then the content should be replaced by the new one
+      // This is because we don't know if the content has changed or not.
+      if (qualifier == null) {
+        delete.deleteFamily(hcol.getFamily());
+      } else {
+        delete.deleteColumn(hcol.getFamily(), qualifier);
+      }
+      @SuppressWarnings({ "rawtypes", "unchecked" })
+      Set<Entry> set = ((Map) o).entrySet();
+      for (@SuppressWarnings("rawtypes") Entry entry : set) {
+        byte[] qual = toBytes(entry.getKey());
+        addPutsAndDeletes(put, delete, entry.getValue(), schema.getValueType()
+            .getType(), schema.getValueType(), hcol, qual);
+      }
+      break;
+    case ARRAY:
+      List<?> array = (List<?>) o;
+      int j = 0;
+      for (Object item : array) {
+        addPutsAndDeletes(put, delete, item, schema.getElementType().getType(),
+            schema.getElementType(), hcol, Bytes.toBytes(j++));
+      }
+      break;
+    default:
+      byte[] serializedBytes = toBytes(o, schema);
+      put.add(hcol.getFamily(), qualifier, serializedBytes);
+      break;
+    }
+  }
+
+  private boolean isNullable(Schema unionSchema) {
+    for (Schema innerSchema : unionSchema.getTypes()) {
+      if (innerSchema.getType().equals(Schema.Type.NULL)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   public void delete(T obj) {
     throw new RuntimeException("Not implemented yet");
   }
@@ -373,8 +359,7 @@ implements Configurable {
       String[] fields = getFieldsToQuery(query.getFields());
       //find whether all fields are queried, which means that complete
       //rows will be deleted
-      boolean isAllFields = Arrays.equals(fields
-          , getBeanFactory().getCachedPersistent().getFields());
+      boolean isAllFields = Arrays.equals(fields, getFields());
   
       org.apache.gora.query.Result<K, T> result = null;
       result = query.execute();
@@ -519,18 +504,28 @@ implements Configurable {
             "Wrong gora-hbase-mapping.xml?");
       }
       Schema fieldSchema = fieldMap.get(f).schema();
-      switch (fieldSchema.getType()) {
-        case MAP:
-        case ARRAY:
-          get.addFamily(col.family); break;
-        default:
-          get.addColumn(col.family, col.qualifier); break;
-      }
+      addFamilyOrColumn(get, col, fieldSchema);
     }
   }
 
-  private void addFields(Scan scan, Query<K,T> query)
-  throws IOException {
+  private void addFamilyOrColumn(Get get, HBaseColumn col, Schema fieldSchema) {
+    switch (fieldSchema.getType()) {
+    case UNION:
+      int index = getResolvedUnionIndex(fieldSchema);
+      Schema resolvedSchema = fieldSchema.getTypes().get(index);
+      addFamilyOrColumn(get, col, resolvedSchema);
+      break;
+    case MAP:
+    case ARRAY:
+      get.addFamily(col.family);
+      break;
+    default:
+      get.addColumn(col.family, col.qualifier);
+      break;
+    }
+  }
+
+  private void addFields(Scan scan, Query<K, T> query) throws IOException {
     String[] fields = query.getFields();
     for (String f : fields) {
       HBaseColumn col = mapping.getColumn(f);
@@ -539,19 +534,30 @@ implements Configurable {
             "Wrong gora-hbase-mapping.xml?");
       }
       Schema fieldSchema = fieldMap.get(f).schema();
-      switch (fieldSchema.getType()) {
-        case MAP:
-        case ARRAY:
-          scan.addFamily(col.family); break;
-        default:
-          scan.addColumn(col.family, col.qualifier); break;
-      }
+      addFamilyOrColumn(scan, col, fieldSchema);
     }
   }
 
-  //TODO: HBase Get, Scan, Delete should extend some common interface with addFamily, etc
-  private void addFields(Delete delete, Query<K,T> query)
-    throws IOException {
+  private void addFamilyOrColumn(Scan scan, HBaseColumn col, Schema fieldSchema) {
+    switch (fieldSchema.getType()) {
+    case UNION:
+      int index = getResolvedUnionIndex(fieldSchema);
+      Schema resolvedSchema = fieldSchema.getTypes().get(index);
+      addFamilyOrColumn(scan, col, resolvedSchema);
+      break;
+    case MAP:
+    case ARRAY:
+      scan.addFamily(col.family);
+      break;
+    default:
+      scan.addColumn(col.family, col.qualifier);
+      break;
+    }
+  }
+
+  // TODO: HBase Get, Scan, Delete should extend some common interface with
+  // addFamily, etc
+  private void addFields(Delete delete, Query<K, T> query)    throws IOException {
     String[] fields = query.getFields();
     for (String f : fields) {
       HBaseColumn col = mapping.getColumn(f);
@@ -560,13 +566,25 @@ implements Configurable {
             "Wrong gora-hbase-mapping.xml?");
       }
       Schema fieldSchema = fieldMap.get(f).schema();
-      switch (fieldSchema.getType()) {
-        case MAP:
-        case ARRAY:
-          delete.deleteFamily(col.family); break;
-        default:
-          delete.deleteColumn(col.family, col.qualifier); break;
-      }
+      addFamilyOrColumn(delete, col, fieldSchema);
+    }
+  }
+
+  private void addFamilyOrColumn(Delete delete, HBaseColumn col,
+      Schema fieldSchema) {
+    switch (fieldSchema.getType()) {
+    case UNION:
+      int index = getResolvedUnionIndex(fieldSchema);
+      Schema resolvedSchema = fieldSchema.getTypes().get(index);
+      addFamilyOrColumn(delete, col, resolvedSchema);
+      break;
+    case MAP:
+    case ARRAY:
+      delete.deleteFamily(col.family);
+      break;
+    default:
+      delete.deleteColumn(col.family, col.qualifier);
+      break;
     }
   }
 
@@ -582,7 +600,6 @@ implements Configurable {
     }
   }
 
-  @SuppressWarnings({ "unchecked", "rawtypes" })
   /**
    * Creates a new Persistent instance with the values in 'result' for the fields listed.
    * @param result result form a HTable#get()
@@ -597,7 +614,6 @@ implements Configurable {
       return null;
 
     T persistent = newPersistent();
-    StateManager stateManager = persistent.getStateManager();
     for (String f : fields) {
       HBaseColumn col = mapping.getColumn(f);
       if (col == null) {
@@ -606,50 +622,90 @@ implements Configurable {
       }
       Field field = fieldMap.get(f);
       Schema fieldSchema = field.schema();
-      switch(fieldSchema.getType()) {
-        case MAP:
-          NavigableMap<byte[], byte[]> qualMap =
-            result.getNoVersionMap().get(col.getFamily());
-          if (qualMap == null) {
-            continue;
-          }
-          Schema valueSchema = fieldSchema.getValueType();
-          Map map = new HashMap();
-          for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
-            map.put(new Utf8(Bytes.toString(e.getKey())),
-                fromBytes(valueSchema, e.getValue()));
-          }
-          setField(persistent, field, map);
-          break;
-        case ARRAY:
-          qualMap = result.getFamilyMap(col.getFamily());
-          if (qualMap == null) {
-            continue;
-          }
-          valueSchema = fieldSchema.getElementType();
-          ArrayList arrayList = new ArrayList();
-          for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
-            arrayList.add(fromBytes(valueSchema, e.getValue()));
-          }
-          ListGenericArray arr = new ListGenericArray(fieldSchema, arrayList);
-          setField(persistent, field, arr);
-          break;
-        default:
-          byte[] val = result.getValue(col.getFamily(), col.getQualifier());
-          if (val == null) {
-            continue;
-          }
-          setField(persistent, field, val);
-          break;
-      }
+      setField(result,persistent, col, field, fieldSchema);
     }
-    stateManager.clearDirty(persistent);
+    persistent.clearDirty();
     return persistent;
   }
 
+  private void setField(Result result, T persistent, HBaseColumn col,
+      Field field, Schema fieldSchema) throws IOException {
+    switch (fieldSchema.getType()) {
+    case UNION:
+      int index = getResolvedUnionIndex(fieldSchema);
+      if (index > 1) { //if more than 2 type in union, deserialize directly for now
+        byte[] val = result.getValue(col.getFamily(), col.getQualifier());
+        if (val == null) {
+          return;
+        }
+        setField(persistent, field, val);
+      } else {
+        Schema resolvedSchema = fieldSchema.getTypes().get(index);
+        setField(result, persistent, col, field, resolvedSchema);
+      }
+      break;
+    case MAP:
+      NavigableMap<byte[], byte[]> qualMap = result.getNoVersionMap().get(
+          col.getFamily());
+      if (qualMap == null) {
+        return;
+      }
+      Schema valueSchema = fieldSchema.getValueType();
+      Map<Utf8, Object> map = new HashMap<Utf8, Object>();
+      for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
+        map.put(new Utf8(Bytes.toString(e.getKey())),
+            fromBytes(valueSchema, e.getValue()));
+      }
+      setField(persistent, field, map);
+      break;
+    case ARRAY:
+      qualMap = result.getFamilyMap(col.getFamily());
+      if (qualMap == null) {
+        return;
+      }
+      valueSchema = fieldSchema.getElementType();
+      ArrayList<Object> arrayList = new ArrayList<Object>();
+      DirtyListWrapper<Object> dirtyListWrapper = new DirtyListWrapper<Object>(arrayList);
+      for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
+        dirtyListWrapper.add(fromBytes(valueSchema, e.getValue()));
+      }
+      setField(persistent, field, arrayList);
+      break;
+    default:
+      byte[] val = result.getValue(col.getFamily(), col.getQualifier());
+      if (val == null) {
+        return;
+      }
+      setField(persistent, field, val);
+      break;
+    }
+  }
+
+  //TODO temporary solution, has to be changed after implementation of saving the index of union type
+  private int getResolvedUnionIndex(Schema unionScema) {
+    if (unionScema.getTypes().size() == 2) {
+
+      // schema [type0, type1]
+      Type type0 = unionScema.getTypes().get(0).getType();
+      Type type1 = unionScema.getTypes().get(1).getType();
+
+      // Check if types are different and there's a "null", like ["null","type"]
+      // or ["type","null"]
+      if (!type0.equals(type1)
+          && (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL))) {
+
+        if (type0.equals(Schema.Type.NULL))
+          return 1;
+        else
+          return 0;
+      }
+    }
+    return 2;
+  }
+
   @SuppressWarnings({ "unchecked", "rawtypes" })
   private void setField(T persistent, Field field, Map map) {
-    persistent.put(field.pos(), new StatefulHashMap(map));
+    persistent.put(field.pos(), new DirtyMapWrapper(map));
   }
 
   private void setField(T persistent, Field field, byte[] val)
@@ -657,9 +713,9 @@ implements Configurable {
     persistent.put(field.pos(), fromBytes(field.schema(), val));
   }
 
-  @SuppressWarnings("rawtypes")
-  private void setField(T persistent, Field field, GenericArray list) {
-    persistent.put(field.pos(), list);
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  private void setField(T persistent, Field field, List list) {
+    persistent.put(field.pos(), new DirtyListWrapper(list));
   }
 
   @SuppressWarnings("unchecked")
@@ -725,7 +781,6 @@ implements Configurable {
             mappingBuilder.addField(fieldName, family, qualifier);
             mappingBuilder.addColumnFamily(tableName, family);
           }
-          
           //we found a matching key and value class definition,
           //do not continue on other class definitions
           break;
@@ -790,4 +845,4 @@ implements Configurable {
     this.scannerCaching = numRows ;
     return this ;
   }
-}
\ No newline at end of file
+}

Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java Sat Apr 12 19:21:53 2014
@@ -19,12 +19,14 @@ package org.apache.gora.hbase.store;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
@@ -35,7 +37,11 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.RowLock;
+import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 
@@ -145,18 +151,6 @@ public class HBaseTableConnection implem
   }
 
   @Override
-  public void batch(List<Row> actions, Object[] results) throws IOException,
-      InterruptedException {
-    getTable().batch(actions, results);
-  }
-
-  @Override
-  public Object[] batch(List<Row> actions) throws IOException,
-      InterruptedException {
-    return getTable().batch(actions);
-  }
-
-  @Override
   public Result get(Get get) throws IOException {
     return getTable().get(get);
   }
@@ -254,4 +248,78 @@ public class HBaseTableConnection implem
   public void unlockRow(RowLock rl) throws IOException {
     getTable().unlockRow(rl);
   }
+
+  @Override
+  public void batch(List<? extends Row> actions, Object[] results)
+      throws IOException, InterruptedException {
+    // TODO Auto-generated method stub
+    getTable().batch(actions, results);
+    
+  }
+
+  @Override
+  public Object[] batch(List<? extends Row> actions) throws IOException,
+      InterruptedException {
+    // TODO Auto-generated method stub
+    return getTable().batch(actions);
+  }
+
+  @Override
+  public void mutateRow(RowMutations rm) throws IOException {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public Result append(Append append) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol,
+      byte[] row) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public <T extends CoprocessorProtocol, R> Map<byte[], R> coprocessorExec(
+      Class<T> protocol, byte[] startKey, byte[] endKey, Call<T, R> callable)
+      throws IOException, Throwable {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public <T extends CoprocessorProtocol, R> void coprocessorExec(
+      Class<T> protocol, byte[] startKey, byte[] endKey, Call<T, R> callable,
+      Callback<R> callback) throws IOException, Throwable {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public void setAutoFlush(boolean autoFlush) {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public long getWriteBufferSize() {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public void setWriteBufferSize(long writeBufferSize) throws IOException {
+    // TODO Auto-generated method stub
+    
+  }
 }

Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java Sat Apr 12 19:21:53 2014
@@ -20,10 +20,8 @@ package org.apache.gora.hbase.util;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
@@ -31,12 +29,13 @@ import org.apache.avro.generic.GenericDa
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
+
 import org.apache.gora.util.AvroUtils;
-import org.apache.gora.avro.PersistentDatumReader;
-import org.apache.gora.avro.PersistentDatumWriter;
+
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -47,27 +46,13 @@ public class HBaseByteInterface {
   /**
    * Threadlocals maintaining reusable binary decoders and encoders.
    */
+  private static ThreadLocal<ByteArrayOutputStream> outputStream =
+      new ThreadLocal<ByteArrayOutputStream>();
+  
   public static final ThreadLocal<BinaryDecoder> decoders =
       new ThreadLocal<BinaryDecoder>();
-  public static final ThreadLocal<BinaryEncoderWithStream> encoders =
-      new ThreadLocal<BinaryEncoderWithStream>();
-  
-  /**
-   * A BinaryEncoder that exposes the outputstream so that it can be reset
-   * every time. (This is a workaround to reuse BinaryEncoder and the buffers,
-   * normally provided be EncoderFactory, but this class does not exist yet 
-   * in the current Avro version).
-   */
-  public static final class BinaryEncoderWithStream extends BinaryEncoder {
-    public BinaryEncoderWithStream(OutputStream out) {
-      super(out);
-    }
-    
-    protected OutputStream getOut() {
-      return out;
-    }
-  }
-  
+  public static final ThreadLocal<BinaryEncoder> encoders =
+      new ThreadLocal<BinaryEncoder>();
   /*
    * Create a threadlocal map for the datum readers and writers, because
    * they are not thread safe, at least not before Avro 1.4.0 (See AVRO-650).
@@ -75,23 +60,15 @@ public class HBaseByteInterface {
    * writer pair for every schema, instead of one for every thread.
    */
   
-  public static final ThreadLocal<Map<String, SpecificDatumReader<?>>> 
-    readerMaps = new ThreadLocal<Map<String, SpecificDatumReader<?>>>() {
-      protected Map<String,SpecificDatumReader<?>> initialValue() {
-        return new HashMap<String, SpecificDatumReader<?>>();
-      };
-  };
-  
-  public static final ThreadLocal<Map<String, SpecificDatumWriter<?>>> 
-    writerMaps = new ThreadLocal<Map<String, SpecificDatumWriter<?>>>() {
-      protected Map<String,SpecificDatumWriter<?>> initialValue() {
-        return new HashMap<String, SpecificDatumWriter<?>>();
-      };
-  };
+  public static final ConcurrentHashMap<String, SpecificDatumReader<?>> readerMap = 
+      new ConcurrentHashMap<String, SpecificDatumReader<?>>();
+     
+  public static final ConcurrentHashMap<String, SpecificDatumWriter<?>> writerMap = 
+      new ConcurrentHashMap<String, SpecificDatumWriter<?>>();
 
   /**
-   * Deserializes an array of bytes matching the given schema to the proper basic (enum, Utf8,...) or
-   * complex type (Persistent/Record).
+   * Deserializes an array of bytes matching the given schema to the proper basic 
+   * (enum, Utf8,...) or complex type (Persistent/Record).
    * 
    * Does not handle <code>arrays/maps</code> if not inside a <code>record</code> type.
    * 
@@ -100,7 +77,7 @@ public class HBaseByteInterface {
    * @return Enum|Utf8|ByteBuffer|Integer|Long|Float|Double|Boolean|Persistent|Null
    * @throws IOException
    */
-  @SuppressWarnings("rawtypes")
+  @SuppressWarnings({ "rawtypes" })
   public static Object fromBytes(Schema schema, byte[] val) throws IOException {
     Type type = schema.getType();
     switch (type) {
@@ -144,37 +121,28 @@ public class HBaseByteInterface {
       // => deserialize like "case RECORD"
 
     case RECORD:
-      Map<String, SpecificDatumReader<?>> readerMap = readerMaps.get();
-      PersistentDatumReader<?> reader = null ;
-            
-      // For UNION schemas, must use a specific PersistentDatumReader
+      // For UNION schemas, must use a specific SpecificDatumReader
       // from the readerMap since unions don't have own name
       // (key name in map will be "UNION-type-type-...")
-      if (schema.getType().equals(Schema.Type.UNION)) {
-        reader = (PersistentDatumReader<?>)readerMap.get(String.valueOf(schema.hashCode()));
-        if (reader == null) {
-          reader = new PersistentDatumReader(schema, false);// ignore dirty bits
-          readerMap.put(String.valueOf(schema.hashCode()), reader);
-        }
-      } else {
-        // ELSE use reader for Record
-        reader = (PersistentDatumReader<?>)readerMap.get(schema.getFullName());
-        if (reader == null) {
-          reader = new PersistentDatumReader(schema, false);// ignore dirty bits
-          readerMap.put(schema.getFullName(), reader);
+      String schemaId = schema.getType().equals(Schema.Type.UNION) ? String.valueOf(schema.hashCode()) : schema.getFullName();      
+      
+      SpecificDatumReader<?> reader = (SpecificDatumReader<?>)readerMap.get(schemaId);
+      if (reader == null) {
+        reader = new SpecificDatumReader(schema);// ignore dirty bits
+        SpecificDatumReader localReader=null;
+        if((localReader=readerMap.putIfAbsent(schemaId, reader))!=null) {
+          reader = localReader;
         }
       }
       
       // initialize a decoder, possibly reusing previous one
       BinaryDecoder decoderFromCache = decoders.get();
-      BinaryDecoder decoder=DecoderFactory.defaultFactory().
-          createBinaryDecoder(val, decoderFromCache);
+      BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(val, null);
       // put in threadlocal cache if the initial get was empty
       if (decoderFromCache==null) {
         decoders.set(decoder);
       }
-      
-      return reader.read((Object)null, schema, decoder);
+      return reader.read(null, decoder);
     default: throw new RuntimeException("Unknown type: "+type);
     }
   }
@@ -255,7 +223,7 @@ public class HBaseByteInterface {
   public static byte[] toBytes(Object o, Schema schema) throws IOException {
     Type type = schema.getType();
     switch (type) {
-    case STRING:  return Bytes.toBytes(((Utf8)o).toString()); // TODO: maybe ((Utf8)o).getBytes(); ?
+    case STRING:  return Bytes.toBytes(((CharSequence)o).toString()); // TODO: maybe ((Utf8)o).getBytes(); ?
     case BYTES:   return ((ByteBuffer)o).array();
     case INT:     return Bytes.toBytes((Integer)o);
     case LONG:    return Bytes.toBytes((Long)o);
@@ -264,65 +232,26 @@ public class HBaseByteInterface {
     case BOOLEAN: return (Boolean)o ? new byte[] {1} : new byte[] {0};
     case ENUM:    return new byte[] { (byte)((Enum<?>) o).ordinal() };
     case UNION:
-      // XXX Special case: When writing the top-level field of a record we must handle the
-      // special case ["null","type"] definitions: this will be written as if it was ["type"]
-      // if not in a special case, will execute "case RECORD".
-      
-      if (schema.getTypes().size() == 2) {
-        
-        // schema [type0, type1]
-        Type type0 = schema.getTypes().get(0).getType() ;
-        Type type1 = schema.getTypes().get(1).getType() ;
-        
-        // Check if types are different and there's a "null", like ["null","type"] or ["type","null"]
-        if (!type0.equals(type1)
-            && (   type0.equals(Schema.Type.NULL)
-                || type1.equals(Schema.Type.NULL))) {
-
-          if (o == null) return null ;
-          
-          int index = GenericData.get().resolveUnion(schema, o);
-          schema = schema.getTypes().get(index) ;
-          
-          return toBytes(o, schema) ; // Serialize as if schema was ["type"] 
-        }
-        
-      }
-      // else
-      //   type = [type0,type1] where type0=type1
-      // => Serialize like "case RECORD" with Avro
-      
     case RECORD:
-      Map<String, SpecificDatumWriter<?>> writerMap = writerMaps.get();
-      PersistentDatumWriter writer = null ;
-      // For UNION schemas, must use a specific PersistentDatumReader
-      // from the readerMap since unions don't have own name
-      // (key name in map will be "UNION-type-type-...")
-      if (schema.getType().equals(Schema.Type.UNION)) {
-        writer = (PersistentDatumWriter<?>) writerMap.get(String.valueOf(schema.hashCode()));
-        if (writer == null) {
-          writer = new PersistentDatumWriter(schema,false);// ignore dirty bits
-          writerMap.put(String.valueOf(schema.hashCode()),writer);
-        }
-      } else {
-        // ELSE use writer for Record
-        writer = (PersistentDatumWriter<?>) writerMap.get(schema.getFullName());
-        if (writer == null) {
-          writer = new PersistentDatumWriter(schema,false);// ignore dirty bits
-          writerMap.put(schema.getFullName(),writer);
-        }
+      SpecificDatumWriter writer = (SpecificDatumWriter<?>) writerMap.get(schema.getFullName());
+      if (writer == null) {
+        writer = new SpecificDatumWriter(schema);// ignore dirty bits
+        writerMap.put(schema.getFullName(),writer);
       }
-      
-      BinaryEncoderWithStream encoder = encoders.get();
-      if (encoder == null) {
-        encoder = new BinaryEncoderWithStream(new ByteArrayOutputStream());
+
+      BinaryEncoder encoderFromCache = encoders.get();
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      outputStream.set(bos);
+      BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(bos, null);
+      if (encoderFromCache == null) {
         encoders.set(encoder);
       }
+
       //reset the buffers
-      ByteArrayOutputStream os = (ByteArrayOutputStream) encoder.getOut();
+      ByteArrayOutputStream os = outputStream.get();
       os.reset();
-      
-      writer.write(schema,o, encoder);
+
+      writer.write(o, encoder);
       encoder.flush();
       return os.toByteArray();
     default: throw new RuntimeException("Unknown type: "+type);

Modified: gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml (original)
+++ gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml Sat Apr 12 19:21:53 2014
@@ -44,6 +44,7 @@
     <field name="content" family="content"/>
     <field name="parsedContent" family="parsedContent"/>
     <field name="outlinks" family="outlinks"/>
+    <field name="headers" family="headers"/>
     <field name="metadata" family="common" qualifier="metadata"/>
   </class>
 

Modified: gora/trunk/gora-hbase/src/test/conf/hbase-site.xml
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/test/conf/hbase-site.xml?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/test/conf/hbase-site.xml (original)
+++ gora/trunk/gora-hbase/src/test/conf/hbase-site.xml Sat Apr 12 19:21:53 2014
@@ -119,7 +119,6 @@
     <description>
     Maximum desired file size for an HRegion.  If filesize exceeds
     value + (value / 2), the HRegion is split in two.  Default: 256M.
-
     Keep the maximum filesize small so we split more often in tests.
     </description>
   </property>
@@ -129,9 +128,14 @@
   </property>
   <property>
     <name>hbase.zookeeper.property.clientPort</name>
-    <value>21818</value>
+    <value>2181</value>
     <description>Property from ZooKeeper's config zoo.cfg.
     The port at which the clients will connect.
     </description>
   </property>
+  <property>
+    <name>hbase.zookeeper.quorum</name>
+    <value>localhost</value>
+    <description>The directory shared by region servers.</description>
+  </property>
 </configuration>

Modified: gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java (original)
+++ gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java Sat Apr 12 19:21:53 2014
@@ -22,19 +22,22 @@ import org.apache.gora.GoraTestDriver;
 import org.apache.gora.hbase.store.HBaseStore;
 import org.apache.gora.hbase.util.HBaseClusterSingleton;
 import org.apache.hadoop.conf.Configuration;
-
-//HBase imports
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 
 /**
  * Helper class for third part tests using gora-hbase backend. 
  * @see GoraTestDriver
  */
 public class GoraHBaseTestDriver extends GoraTestDriver {
+
+  /**
+   * Cluster object used for testing.
+   */
   private static final HBaseClusterSingleton cluster = HBaseClusterSingleton.build(1);
 
+  /**
+   * Default Constructor.
+   */
   public GoraHBaseTestDriver() {
     super(HBaseStore.class);
   }
@@ -42,6 +45,7 @@ public class GoraHBaseTestDriver extends
   @Override
   public void setUpClass() throws Exception {
     super.setUpClass();
+    conf = getConf();
     log.info("Setting up HBase Test Driver");
   }
 
@@ -50,28 +54,40 @@ public class GoraHBaseTestDriver extends
     super.tearDownClass();
     log.info("Teardown HBase test driver");
   }
-  
+
   @Override
   public void setUp() throws Exception {
     cluster.truncateAllTables();
     // super.setUp() deletes all tables, but must only truncate in the right way -HBaseClusterSingleton-
     //super.setUp();
   }
-  
+
   @Override
   public void tearDown() throws Exception {
     // Do nothing. setUp() must ensure the right data.
   }
+
+  /**
+   * Deletes all tables from the MiniCluster
+   * @throws Exception in case some table is not able to be deleted.
+   */
   public void deleteAllTables() throws Exception {
     cluster.deleteAllTables();
   }
-  
+
+  /**
+   * Gets the configuration from the MiniCluster.
+   * @return Configuration from MiniCluster.
+   */
   public Configuration getConf() {
     return cluster.getHbaseTestingUtil().getConfiguration();
   }
-  
+
+  /**
+   * Gets HBaseTestingUtility from the MiniCluster object.
+   * @return HBaseTestingUtility object
+   */
   public HBaseTestingUtility getHbaseUtil() {
     return cluster.getHbaseTestingUtil();
   }
-  
-}		
+}

Modified: gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java (original)
+++ gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java Sat Apr 12 19:21:53 2014
@@ -18,7 +18,6 @@
 
 package org.apache.gora.hbase.mapreduce;
 
-import org.apache.gora.examples.generated.TokenDatum;
 import org.apache.gora.examples.generated.WebPage;
 import org.apache.gora.hbase.store.HBaseStore;
 import org.apache.gora.hbase.util.HBaseClusterSingleton;