You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by rm...@apache.org on 2014/04/12 21:21:56 UTC
svn commit: r1586888 [8/10] - in /gora/trunk: ./ bin/ gora-accumulo/
gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/
gora-accumulo/src/main/java/org/apache/gora/accumulo/query/
gora-accumulo/src/main/java/org/apache/gora/accumulo/store/ ...
Modified: gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java (original)
+++ gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java Sat Apr 12 19:21:53 2014
@@ -42,7 +42,7 @@ import static org.junit.Assert.assertFal
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
-import org.apache.avro.generic.GenericArray;
+import org.apache.avro.Schema.Field;
import org.apache.avro.util.Utf8;
import org.apache.gora.examples.WebPageDataCreator;
import org.apache.gora.examples.generated.Employee;
@@ -53,9 +53,9 @@ import org.apache.gora.persistency.impl.
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
+import org.apache.gora.util.AvroUtils;
import org.apache.gora.util.ByteUtils;
import org.apache.gora.util.StringUtils;
-import org.junit.Test;
/**
* Test utilities for DataStores. This utility class provides everything
@@ -89,7 +89,7 @@ public class DataStoreTestUtil {
public static <K> Employee createEmployee(
DataStore<K, Employee> dataStore) throws IOException, Exception {
- Employee employee = dataStore.newPersistent();
+ Employee employee = Employee.newBuilder().build();
employee.setName(new Utf8("Random Joe"));
employee.setDateOfBirth( System.currentTimeMillis() - 20L * YEAR_IN_MS );
employee.setSalary(100000);
@@ -97,12 +97,22 @@ public class DataStoreTestUtil {
return employee;
}
- public static <K> Employee createBoss(
- DataStore<K, Employee> dataStore) throws IOException, Exception {
+ private static <K> WebPage createWebPage(DataStore<K, Employee> dataStore) {
+ WebPage webpage = WebPage.newBuilder().build();
+ webpage.setUrl(new Utf8("url.."));
+ webpage.setContent(ByteBuffer.wrap("test content".getBytes()));
+ webpage.setParsedContent(new ArrayList<CharSequence>());
+ Metadata metadata = Metadata.newBuilder().build();
+ webpage.setMetadata(metadata);
+ return webpage;
+ }
- Employee employee = dataStore.newPersistent();
+ public static <K> Employee createBoss(DataStore<K, Employee> dataStore)
+ throws IOException, Exception {
+
+ Employee employee = Employee.newBuilder().build();
employee.setName(new Utf8("Random boss"));
- employee.setDateOfBirth( System.currentTimeMillis() - 22L * YEAR_IN_MS );
+ employee.setDateOfBirth(System.currentTimeMillis() - 22L * YEAR_IN_MS);
employee.setSalary(1000000);
employee.setSsn(new Utf8("202020202020"));
return employee;
@@ -159,25 +169,23 @@ public class DataStoreTestUtil {
dataStore.put(ssn, employee);
dataStore.flush();
- Employee after = dataStore.get(ssn, Employee._ALL_FIELDS);
+ Employee after = dataStore.get(ssn, AvroUtils.getSchemaFieldNames(Employee.SCHEMA$));
- assertEquals(employee, after);
+ assertEqualEmployeeObjects(employee, after);
}
-
public static void testGetEmployeeRecursive(DataStore<String, Employee> dataStore)
throws IOException, Exception {
Employee employee = DataStoreTestUtil.createEmployee(dataStore);
Employee boss = DataStoreTestUtil.createBoss(dataStore);
- employee.setBoss(boss) ;
+ employee.setBoss(boss);
String ssn = employee.getSsn().toString();
dataStore.put(ssn, employee);
dataStore.flush();
- Employee after = dataStore.get(ssn, Employee._ALL_FIELDS);
- assertEquals(employee, after);
- assertEquals(boss, after.getBoss()) ;
+ Employee after = dataStore.get(ssn, AvroUtils.getSchemaFieldNames(Employee.SCHEMA$));
+ assertEqualEmployeeObjects(employee, after);
}
public static void testGetEmployeeDoubleRecursive(DataStore<String, Employee> dataStore)
@@ -193,10 +201,8 @@ public class DataStoreTestUtil {
String ssn = employee.getSsn().toString();
dataStore.put(ssn, employee);
dataStore.flush();
- Employee after = dataStore.get(ssn, Employee._ALL_FIELDS);
- assertEquals(employee, after);
- assertEquals(boss, after.getBoss()) ;
- assertEquals(uberBoss, ((Employee)after.getBoss()).getBoss()) ;
+ Employee after = dataStore.get(ssn, AvroUtils.getSchemaFieldNames(Employee.SCHEMA$));
+ assertEqualEmployeeObjects(employee, after);
}
public static void testGetEmployeeNested(DataStore<String, Employee> dataStore)
@@ -207,7 +213,8 @@ public class DataStoreTestUtil {
webpage.setUrl(new Utf8("url..")) ;
webpage.setContent(ByteBuffer.wrap("test content".getBytes())) ;
- Metadata metadata = new BeanFactoryImpl<String,Metadata>(String.class,Metadata.class).newPersistent() ;
+ webpage.setParsedContent(new ArrayList<CharSequence>());
+ Metadata metadata = new BeanFactoryImpl<String,Metadata>(String.class,Metadata.class).newPersistent();
webpage.setMetadata(metadata) ;
employee.setWebpage(webpage) ;
@@ -215,9 +222,9 @@ public class DataStoreTestUtil {
dataStore.put(ssn, employee);
dataStore.flush();
- Employee after = dataStore.get(ssn, Employee._ALL_FIELDS);
- assertEquals(employee, after);
- assertEquals(webpage, after.getWebpage()) ;
+ Employee after = dataStore.get(ssn, AvroUtils.getSchemaFieldNames(Employee.SCHEMA$));
+ assertEqualEmployeeObjects(employee, after);
+ assertEqualWebPageObjects(webpage, after.getWebpage());
}
public static void testGetEmployee3UnionField(DataStore<String, Employee> dataStore)
@@ -225,12 +232,12 @@ public class DataStoreTestUtil {
Employee employee = DataStoreTestUtil.createEmployee(dataStore);
employee.setBoss(new Utf8("Real boss")) ;
-
+
String ssn = employee.getSsn().toString();
dataStore.put(ssn, employee);
dataStore.flush();
- Employee after = dataStore.get(ssn, Employee._ALL_FIELDS);
- assertEquals(employee, after);
+ Employee after = dataStore.get(ssn, AvroUtils.getSchemaFieldNames(Employee.SCHEMA$));
+ assertEqualEmployeeObjects(employee, after);
assertEquals("Real boss", ((Utf8)after.getBoss()).toString()) ;
}
@@ -243,32 +250,139 @@ public class DataStoreTestUtil {
public static void testGetEmployeeWithFields(DataStore<String, Employee> dataStore)
throws IOException, Exception {
Employee employee = DataStoreTestUtil.createEmployee(dataStore);
+ WebPage webpage = createWebPage(dataStore);
+ employee.setWebpage(webpage);
+ Employee boss = createBoss(dataStore);
+ employee.setBoss(boss);
String ssn = employee.getSsn().toString();
dataStore.put(ssn, employee);
dataStore.flush();
- // XXX See GORA-216: special case until later reviewed.
- // Like in K-V stores, if retrieved column does not exists ([webpage] case),
- // get() must return 'null'.
- // We prepare an actual weird synthetic test.
-
- // String[] fields = employee.getFields();
- String[] fields = {"name","dateOfBirth","ssn","salary"} ;
-
+ String[] fields = AvroUtils.getPersistentFieldNames(employee);
for(Set<String> subset : StringUtils.powerset(fields)) {
if(subset.isEmpty())
continue;
Employee after = dataStore.get(ssn, subset.toArray(new String[subset.size()]));
- Employee expected = new Employee();
+ Employee expected = Employee.newBuilder().build();
for(String field:subset) {
- int index = expected.getFieldIndex(field);
+ int index = expected.getSchema().getField(field).pos();
expected.put(index, employee.get(index));
}
- assertEquals(expected, after);
+ assertEqualEmployeeObjects(expected, after);
+ }
+ }
+
+ /**
+ * Simple function which iterates through a before (put) and after (get) object
+ * in an attempt to verify if the same field's and values have been obtained.
+ * Within the original employee object we iterate from 1 instead of 0 due to the
+ * removal of the '__g__' field at position 0 when we put objects into the datastore.
+ * This field is used to identify whether fields within the object, and
+ * consequently the object itself, are/is dirty however this field is not
+ * required when persisting the object.
+ * We explicitly get values from each field as this makes it easier to debug
+ * if tests go wrong.
+ * @param employee
+ * @param after
+ */
+ private static void assertEqualEmployeeObjects(Employee employee, Employee after) {
+ //for (int i = 1; i < employee.SCHEMA$.getFields().size(); i++) {
+ // for (int j = 1; j < after.SCHEMA$.getFields().size(); j++) {
+ // assertEquals(employee.SCHEMA$.getFields().get(i), after.SCHEMA$.getFields().get(j));
+ // }
+ //}
+ //check name field
+ CharSequence beforeName = employee.getName();
+ CharSequence afterName = after.getName();
+ assertEquals(beforeName, afterName);
+ //check dateOfBirth field
+ Long beforeDOB = employee.getDateOfBirth();
+ Long afterDOB = after.getDateOfBirth();
+ assertEquals(beforeDOB, afterDOB);
+ //check ssn field
+ CharSequence beforeSsn = employee.getSsn();
+ CharSequence afterSsn = after.getSsn();
+ assertEquals(beforeSsn, afterSsn);
+ //check salary field
+ Integer beforeSalary = employee.getSalary();
+ Integer afterSalary = after.getSalary();
+ assertEquals(beforeSalary, afterSalary);
+ //check boss field
+ if (employee.getBoss() != null) {
+ if (employee.getBoss() instanceof Utf8) {
+ String beforeBoss = employee.getBoss().toString();
+ String afterBoss = after.getBoss().toString();
+ assertEquals("Boss String field values in UNION should be the same",
+ beforeBoss, afterBoss);
+ } else {
+ Employee beforeBoss = (Employee) employee.getBoss();
+ Employee afterBoss = (Employee) after.getBoss();
+ assertEqualEmployeeObjects(beforeBoss, afterBoss);
+ }
+ }
+ //check webpage field
+ if (employee.getWebpage() != null) {
+ WebPage beforeWebPage = employee.getWebpage();
+ WebPage afterWebPage = after.getWebpage();
+ assertEqualWebPageObjects(beforeWebPage, afterWebPage);
}
}
+ /**
+ * Mimics {@link org.apache.gora.store.DataStoreTestUtil#assertEqualEmployeeObjects(Employee, Employee)}
+ * in that we pick our way through fields within before and after
+ * {@link org.apache.gora.examples.generated.WebPage} objects comparing field values.
+ * @param beforeWebPage
+ * @param afterWebPage
+ */
+ private static void assertEqualWebPageObjects(WebPage beforeWebPage, WebPage afterWebPage) {
+ //check url field
+ CharSequence beforeUrl = beforeWebPage.getUrl();
+ CharSequence afterUrl = afterWebPage.getUrl();
+ assertEquals(beforeUrl, afterUrl);
+ //check content field
+ ByteBuffer beforeContent = beforeWebPage.getContent();
+ ByteBuffer afterContent = afterWebPage.getContent();
+ assertEquals(beforeContent, afterContent);
+ //check parsedContent field
+ List<CharSequence> beforeParsedContent =
+ (List<CharSequence>) beforeWebPage.getParsedContent();
+ List<CharSequence> afterParsedContent =
+ (List<CharSequence>) afterWebPage.getParsedContent();
+ assertEquals(beforeParsedContent, afterParsedContent);
+ //check outlinks field
+ Map<CharSequence, CharSequence> beforeOutlinks =
+ (Map<java.lang.CharSequence,java.lang.CharSequence>) beforeWebPage.getOutlinks();
+ Map<CharSequence, CharSequence> afterOutlinks =
+ (Map<java.lang.CharSequence,java.lang.CharSequence>) afterWebPage.getOutlinks();
+ assertEquals(beforeOutlinks, afterOutlinks);
+ //check metadata field
+ if (beforeWebPage.get(5) != null) {
+ Metadata beforeMetadata = beforeWebPage.getMetadata();
+ Metadata afterMetadata = afterWebPage.getMetadata();
+ assertEqualMetadataObjects(beforeMetadata, afterMetadata);
+ }
+ }
+
+ /**
+ * Mimics {@link org.apache.gora.store.DataStoreTestUtil#assertEqualEmployeeObjects(Employee, Employee)}
+ * in that we pick our way through fields within before and after
+ * {@link org.apache.gora.examples.generated.Metadata} objects comparing field values.
+ * @param beforeMetadata
+ * @param afterMetadata
+ */
+ private static void assertEqualMetadataObjects(Metadata beforeMetadata, Metadata afterMetadata) {
+ //check version field
+ int beforeVersion = beforeMetadata.getVersion();
+ int afterVersion = afterMetadata.getVersion();
+ assertEquals(beforeVersion, afterVersion);
+ //check data field
+ Map<CharSequence, CharSequence> beforeData = beforeMetadata.getData();
+ Map<CharSequence, CharSequence> afterData = afterMetadata.getData();
+ assertEquals(beforeData, afterData);
+ }
+
public static Employee testPutEmployee(DataStore<String, Employee> dataStore)
throws IOException, Exception {
dataStore.createSchema();
@@ -306,6 +420,16 @@ public class DataStoreTestUtil {
assertNull(employee);
}
+ /**
+ * Here we create 5 {@link org.apache.gora.examples.generated.Employee} objects
+ * before populating fields with data and flushing them to the datastore.
+ * We then update the 1st of the {@link org.apache.gora.examples.generated.Employee}'s
+ * with more data and flush this data. Assertions are then made over the updated
+ * {@link org.apache.gora.examples.generated.Employee} object.
+ * @param dataStore
+ * @throws IOException
+ * @throws Exception
+ */
public static void testUpdateEmployee(DataStore<String, Employee> dataStore)
throws IOException, Exception {
dataStore.createSchema();
@@ -313,7 +437,7 @@ public class DataStoreTestUtil {
long now = System.currentTimeMillis();
for (int i = 0; i < 5; i++) {
- Employee employee = dataStore.newPersistent();
+ Employee employee = Employee.newBuilder().build();
employee.setName(new Utf8("John Doe " + i));
employee.setDateOfBirth(now - 20L * YEAR_IN_MS);
employee.setSalary(100000);
@@ -324,9 +448,9 @@ public class DataStoreTestUtil {
dataStore.flush();
for (int i = 0; i < 1; i++) {
- Employee employee = dataStore.newPersistent();
+ Employee employee = Employee.newBuilder().build();
employee.setName(new Utf8("John Doe " + (i + 5)));
- employee.setDateOfBirth(now - 18L * YEAR_IN_MS);
+ employee.setDateOfBirth(now - 18L * YEAR_IN_MS);
employee.setSalary(120000);
employee.setSsn(new Utf8(Long.toString(ssn + i)));
dataStore.put(employee.getSsn().toString(), employee);
@@ -337,33 +461,42 @@ public class DataStoreTestUtil {
for (int i = 0; i < 1; i++) {
String key = Long.toString(ssn + i);
Employee employee = dataStore.get(key);
- assertEquals(now - 18L * YEAR_IN_MS, employee.getDateOfBirth());
+ assertEquals(now - 18L * YEAR_IN_MS, employee.getDateOfBirth().longValue());
assertEquals("John Doe " + (i + 5), employee.getName().toString());
- assertEquals(120000, employee.getSalary());
+ assertEquals(120000, employee.getSalary().intValue());
}
}
- public static void testUpdateWebPage(DataStore<String, WebPage> dataStore)
+ /**
+ * Here we create 7 {@link org.apache.gora.examples.generated.WebPage}
+ * objects and populate field data before flushing the objects to the
+ * datastore. We then get the objects, adding data to the 'content' and
+ * 'parsedContent' fields before clearing the 'outlinks' field and
+ * re-populating it. This data is then flushed to the datastore.
+ * Finally we get the {@link org.apache.gora.examples.generated.WebPage}
+ * objects and make various assertions over verious fields. This tests
+ * that we can update fields and that data can be written and read correctly.
+ * @param dataStore
+ * @throws IOException
+ * @throws Exception
+ */
+ public static void testUpdateWebPagePutToArray(DataStore<String, WebPage> dataStore)
throws IOException, Exception {
dataStore.createSchema();
String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
- "http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g"};
+ "http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g" };
String content = "content";
String parsedContent = "parsedContent";
- String anchor = "anchor";
int parsedContentCount = 0;
for (int i = 0; i < urls.length; i++) {
- WebPage webPage = dataStore.newPersistent();
+ WebPage webPage = WebPage.newBuilder().build();
webPage.setUrl(new Utf8(urls[i]));
for (parsedContentCount = 0; parsedContentCount < 5; parsedContentCount++) {
- webPage.addToParsedContent(new Utf8(parsedContent + i + "," + parsedContentCount));
- }
- for (int j = 0; j < urls.length; j += 2) {
- webPage.putToOutlinks(new Utf8(anchor + j), new Utf8(urls[j]));
+ webPage.getParsedContent().add(new Utf8(parsedContent + i + "," + parsedContentCount));
}
dataStore.put(webPage.getUrl().toString(), webPage);
}
@@ -374,15 +507,7 @@ public class DataStoreTestUtil {
WebPage webPage = dataStore.get(urls[i]);
webPage.setContent(ByteBuffer.wrap(ByteUtils.toBytes(content + i)));
for (parsedContentCount = 5; parsedContentCount < 10; parsedContentCount++) {
- webPage.addToParsedContent(new Utf8(parsedContent + i + "," + parsedContentCount));
- }
- webPage.getOutlinks().clear();
- for (int j = 1; j < urls.length; j += 2) {
- webPage.putToOutlinks(new Utf8(anchor + j), new Utf8(urls[j]));
- }
- //test for double put of same entries
- for (int j = 1; j < urls.length; j += 2) {
- webPage.putToOutlinks(new Utf8(anchor + j), new Utf8(urls[j]));
+ webPage.getParsedContent().add(new Utf8(parsedContent + i + "," + parsedContentCount));
}
dataStore.put(webPage.getUrl().toString(), webPage);
}
@@ -394,24 +519,90 @@ public class DataStoreTestUtil {
assertEquals(content + i, ByteUtils.toString( toByteArray(webPage.getContent()) ));
assertEquals(10, webPage.getParsedContent().size());
int j = 0;
- for (Utf8 pc : webPage.getParsedContent()) {
+ for (CharSequence pc : webPage.getParsedContent()) {
assertEquals(parsedContent + i + "," + j, pc.toString());
j++;
}
+ }
+ }
+
+ public static void testUpdateWebPagePutToNotNullableMap(DataStore<String, WebPage> dataStore)
+ throws IOException, Exception {
+ dataStore.createSchema();
+
+ String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
+ "http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g" };
+ String anchor = "anchor";
+
+ // putting evens
+ for (int i = 0; i < urls.length; i++) {
+ WebPage webPage = WebPage.newBuilder().build();
+ webPage.setUrl(new Utf8(urls[i]));
+ for (int j = 0; j < urls.length; j += 2) {
+ webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
+ }
+ dataStore.put(webPage.getUrl().toString(), webPage);
+ }
+ dataStore.flush();
+
+ // putting odds
+ for (int i = 0; i < urls.length; i++) {
+ WebPage webPage = dataStore.get(urls[i]);
+ webPage.getOutlinks().clear();
+ for (int j = 1; j < urls.length; j += 2) {
+ webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
+ }
+ // test for double put of same entries
+ for (int j = 1; j < urls.length; j += 2) {
+ webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
+ }
+ dataStore.put(webPage.getUrl().toString(), webPage);
+ }
+ dataStore.flush();
+
+ for (int i = 0; i < urls.length; i++) {
+ WebPage webPage = dataStore.get(urls[i]);
int count = 0;
- for (j = 1; j < urls.length; j += 2) {
- Utf8 link = webPage.getOutlinks().get(new Utf8(anchor + j));
+ for (int j = 1; j < urls.length; j += 2) {
+ CharSequence link = webPage.getOutlinks().get(new Utf8(anchor + j));
assertNotNull(link);
assertEquals(urls[j], link.toString());
count++;
}
assertEquals(count, webPage.getOutlinks().size());
}
+ }
+
+ public static void testUpdateWebPagePutToNullableMap(DataStore<String, WebPage> dataStore)
+ throws IOException, Exception {
+ dataStore.createSchema();
+
+ String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
+ "http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g" };
+ String header = "header";
+ String[] headers = { "firstHeader", "secondHeader", "thirdHeader",
+ "fourthHeader", "fifthHeader", "sixthHeader" };
+
+ for (int i = 0; i < urls.length; i++) {
+ WebPage webPage = WebPage.newBuilder().build();
+ webPage.setUrl(new Utf8(urls[i]));
+ //test put for nullable map field
+ // we put data to the 'headers' field which is a Map with default value of 'null'
+ webPage.setHeaders(new HashMap<CharSequence, CharSequence>());
+ for (int j = 0; j < headers.length; j += 2) {
+ webPage.getHeaders().put(new Utf8(header + j), new Utf8(headers[j]));
+ }
+ dataStore.put(webPage.getUrl().toString(), webPage);
+ }
+
+ dataStore.flush();
for (int i = 0; i < urls.length; i++) {
WebPage webPage = dataStore.get(urls[i]);
- for (int j = 0; j < urls.length; j += 2) {
- webPage.putToOutlinks(new Utf8(anchor + j), new Utf8(urls[j]));
+ //webPage.getHeaders().clear(); //TODO clear method does not work
+ webPage.setHeaders(new HashMap<CharSequence, CharSequence>());
+ for (int j = 1; j < headers.length; j += 2) {
+ webPage.getHeaders().put(new Utf8(header + j), new Utf8(headers[j]));
}
dataStore.put(webPage.getUrl().toString(), webPage);
}
@@ -421,12 +612,93 @@ public class DataStoreTestUtil {
for (int i = 0; i < urls.length; i++) {
WebPage webPage = dataStore.get(urls[i]);
int count = 0;
+ for (int j = 1; j < headers.length; j += 2) {
+ CharSequence headerSample = webPage.getHeaders().get(new Utf8(header + j));
+ assertNotNull(headerSample);
+ assertEquals(headers[j], headerSample.toString());
+ count++;
+ }
+ assertEquals(count, webPage.getHeaders().size());
+ }
+ }
+
+ public static void testUpdateWebPageRemoveMapEntry(DataStore<String, WebPage> dataStore)
+ throws IOException, Exception {
+ dataStore.createSchema();
+
+ String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
+ "http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g" };
+ String anchor = "anchor";
+
+ for (int i = 0; i < urls.length; i++) {
+ WebPage webPage = WebPage.newBuilder().build();
+ webPage.setUrl(new Utf8(urls[i]));
for (int j = 0; j < urls.length; j++) {
- Utf8 link = webPage.getOutlinks().get(new Utf8(anchor + j));
- assertNotNull(link);
- assertEquals(urls[j], link.toString());
+ webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
+ }
+ dataStore.put(webPage.getUrl().toString(), webPage);
+ }
+
+ dataStore.flush();
+
+ // map entry removal test
+ for (int i = 0; i < urls.length; i++) {
+ WebPage webPage = dataStore.get(urls[i]);
+ for (int j = 1; j < urls.length; j += 2) {
+ webPage.getOutlinks().remove(new Utf8(anchor + j));
+ }
+ dataStore.put(webPage.getUrl().toString(), webPage);
+ }
+
+ dataStore.flush();
+
+ for (int i = 0; i < urls.length; i++) {
+ int count = 0;
+ WebPage webPage = dataStore.get(urls[i]);
+ for (int j = 1; j < urls.length; j += 2) {
+ CharSequence link = webPage.getOutlinks().get(new Utf8(anchor + j));
+ assertNull(link);
+ //assertEquals(urls[j], link.toString());
count++;
}
+ assertEquals(urls.length - count, webPage.getOutlinks().size());
+ }
+ }
+
+ public static void testUpdateWebPageRemoveField(DataStore<String, WebPage> dataStore)
+ throws IOException, Exception {
+ dataStore.createSchema();
+
+ String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
+ "http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g" };
+ String header = "header";
+ String[] headers = { "firstHeader", "secondHeader", "thirdHeader",
+ "fourthHeader", "fifthHeader", "sixthHeader" };
+
+ for (int i = 0; i < urls.length; i++) {
+ WebPage webPage = WebPage.newBuilder().build();
+ webPage.setUrl(new Utf8(urls[i]));
+ webPage.setHeaders(new HashMap<CharSequence, CharSequence>());
+ for (int j = 0; j < headers.length; j++) {
+ webPage.getHeaders().put(new Utf8(header + j), new Utf8(headers[j]));
+ }
+ dataStore.put(webPage.getUrl().toString(), webPage);
+ }
+
+ dataStore.flush();
+
+ // nullable map field removal test
+ for (int i = 0; i < urls.length; i++) {
+ WebPage webPage = dataStore.get(urls[i]);
+ webPage.setHeaders(null);
+ dataStore.put(webPage.getUrl().toString(), webPage);
+ }
+
+ dataStore.flush();
+
+ for (int i = 0; i < urls.length; i++) {
+ WebPage webPage = dataStore.get(urls[i]);
+ assertNull(webPage.getHeaders());
}
}
@@ -440,19 +712,20 @@ public class DataStoreTestUtil {
" actual=" + CONTENTS[i] + " i=" + i
, Arrays.equals( toByteArray(page.getContent() )
, CONTENTS[i].getBytes()));
- GenericArray<Utf8> parsedContent = page.getParsedContent();
+
+ List<CharSequence> parsedContent = page.getParsedContent();
assertNotNull(parsedContent);
assertTrue(parsedContent.size() > 0);
int j=0;
String[] tokens = CONTENTS[i].split(" ");
- for(Utf8 token : parsedContent) {
+ for(CharSequence token : parsedContent) {
assertEquals(tokens[j++], token.toString());
}
} else {
// when page.getContent() is null
assertTrue(CONTENTS[i] == null) ;
- GenericArray<Utf8> parsedContent = page.getParsedContent();
+ List<CharSequence> parsedContent = page.getParsedContent();
assertNotNull(parsedContent);
assertTrue(parsedContent.size() == 0);
}
@@ -462,7 +735,7 @@ public class DataStoreTestUtil {
assertTrue(page.getOutlinks().size() > 0);
for(int k=0; k<LINKS[i].length; k++) {
assertEquals(ANCHORS[i][k],
- page.getFromOutlinks(new Utf8(URLS[LINKS[i][k]])).toString());
+ page.getOutlinks().get(new Utf8(URLS[LINKS[i][k]])).toString());
}
} else {
assertTrue(page.getOutlinks() == null || page.getOutlinks().isEmpty());
@@ -480,7 +753,7 @@ public class DataStoreTestUtil {
}
public static void testGetWebPage(DataStore<String, WebPage> store) throws IOException, Exception {
- testGetWebPage(store, WebPage._ALL_FIELDS);
+ testGetWebPage(store, getFields(WebPage.SCHEMA$.getFields()));
}
public static void testGetWebPageDefaultFields(DataStore<String, WebPage> store)
@@ -507,7 +780,7 @@ public class DataStoreTestUtil {
public static void testQueryWebPageSingleKey(DataStore<String, WebPage> store)
throws IOException, Exception {
- testQueryWebPageSingleKey(store, WebPage._ALL_FIELDS);
+ testQueryWebPageSingleKey(store, getFields(WebPage.SCHEMA$.getFields()));
}
public static void testQueryWebPageSingleKeyDefaultFields(
@@ -714,7 +987,7 @@ public class DataStoreTestUtil {
WebPageDataCreator.createWebPageData(store);
query = store.newQuery();
- query.setFields(WebPage._ALL_FIELDS);
+ query.setFields(AvroUtils.getSchemaFieldNames(WebPage.SCHEMA$));
assertNumResults(store.newQuery(), URLS.length);
store.deleteByQuery(query);
@@ -757,8 +1030,8 @@ public class DataStoreTestUtil {
WebPageDataCreator.createWebPageData(store);
query = store.newQuery();
- query.setFields(WebPage.Field.OUTLINKS.getName()
- , WebPage.Field.PARSED_CONTENT.getName(), WebPage.Field.CONTENT.getName());
+ query.setFields("outlinks"
+ , "parsedContent", "content");
assertNumResults(store.newQuery(), URLS.length);
store.deleteByQuery(query);
@@ -776,7 +1049,8 @@ public class DataStoreTestUtil {
assertNotNull(page.getUrl());
assertEquals(page.getUrl().toString(), SORTED_URLS[i]);
- assertEquals(0, page.getOutlinks().size());
+ assertEquals("Map of Outlinks should have a size of '0' as the deleteByQuery "
+ + "not only removes the data but also the data structure.", 0, page.getOutlinks().size());
assertEquals(0, page.getParsedContent().size());
if(page.getContent() != null) {
System.out.println("url:" + page.getUrl().toString());
@@ -790,7 +1064,7 @@ public class DataStoreTestUtil {
WebPageDataCreator.createWebPageData(store);
query = store.newQuery();
- query.setFields(WebPage.Field.URL.getName());
+ query.setFields("url");
String startKey = SORTED_URLS[NUM_KEYS];
String endKey = SORTED_URLS[SORTED_URLS.length - NUM_KEYS];
query.setStartKey(startKey);
@@ -832,10 +1106,10 @@ public class DataStoreTestUtil {
String url = "http://foo.com/";
store.createSchema();
- WebPage page = store.newPersistent();
- Metadata metadata = new Metadata();
+ WebPage page = WebPage.newBuilder().build();
+ Metadata metadata = Metadata.newBuilder().build();
metadata.setVersion(1);
- metadata.putToData(new Utf8("foo"), new Utf8("baz"));
+ metadata.getData().put(new Utf8("foo"), new Utf8("baz"));
page.setMetadata(metadata);
page.setUrl(new Utf8(url));
@@ -846,30 +1120,31 @@ public class DataStoreTestUtil {
page = store.get(revUrl);
metadata = page.getMetadata();
assertNotNull(metadata);
- assertEquals(1, metadata.getVersion());
+ assertEquals(1, metadata.getVersion().intValue());
assertEquals(new Utf8("baz"), metadata.getData().get(new Utf8("foo")));
}
public static void testPutArray(DataStore<String, WebPage> store)
throws IOException, Exception {
store.createSchema();
- WebPage page = store.newPersistent();
+ WebPage page = WebPage.newBuilder().build();
String[] tokens = {"example", "content", "in", "example.com"};
-
+ page.setParsedContent(new ArrayList<CharSequence>());
for(String token: tokens) {
- page.addToParsedContent(new Utf8(token));
+ page.getParsedContent().add(new Utf8(token));
}
store.put("com.example/http", page);
store.close();
+
}
public static byte[] testPutBytes(DataStore<String, WebPage> store)
throws IOException, Exception {
store.createSchema();
- WebPage page = store.newPersistent();
+ WebPage page = WebPage.newBuilder().build();
page.setUrl(new Utf8("http://example.com"));
byte[] contentBytes = "example content in example.com".getBytes();
ByteBuffer buff = ByteBuffer.wrap(contentBytes);
@@ -886,12 +1161,12 @@ public class DataStoreTestUtil {
store.createSchema();
- WebPage page = store.newPersistent();
+ WebPage page = WebPage.newBuilder().build();
page.setUrl(new Utf8("http://example.com"));
- page.putToOutlinks(new Utf8("http://example2.com"), new Utf8("anchor2"));
- page.putToOutlinks(new Utf8("http://example3.com"), new Utf8("anchor3"));
- page.putToOutlinks(new Utf8("http://example3.com"), new Utf8("anchor4"));
+ page.getOutlinks().put(new Utf8("http://example2.com"), new Utf8("anchor2"));
+ page.getOutlinks().put(new Utf8("http://example3.com"), new Utf8("anchor3"));
+ page.getOutlinks().put(new Utf8("http://example3.com"), new Utf8("anchor4"));
store.put("com.example/http", page);
store.close();
}
@@ -905,5 +1180,23 @@ public class DataStoreTestUtil {
}
return bytes;
}
-
+
+ public static String[] getFields(List<Field> schemaFields) {
+
+ List<Field> list = new ArrayList<Field>();
+ for (Field field : schemaFields) {
+ if (!Persistent.DIRTY_BYTES_FIELD_NAME.equalsIgnoreCase(field.name())) {
+ list.add(field);
+ }
+ }
+ schemaFields = list;
+
+ String[] fieldNames = new String[schemaFields.size()];
+ for(int i = 0; i<fieldNames.length; i++ ){
+ fieldNames[i] = schemaFields.get(i).name();
+ }
+
+ return fieldNames;
+ }
+
}
Modified: gora/trunk/gora-core/src/test/java/org/apache/gora/store/TestDataStoreFactory.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/test/java/org/apache/gora/store/TestDataStoreFactory.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-core/src/test/java/org/apache/gora/store/TestDataStoreFactory.java (original)
+++ gora/trunk/gora-core/src/test/java/org/apache/gora/store/TestDataStoreFactory.java Sat Apr 12 19:21:53 2014
@@ -20,9 +20,9 @@ package org.apache.gora.store;
import java.util.Properties;
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNotSame;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
import org.apache.gora.avro.store.DataFileAvroStore;
import org.apache.gora.mock.persistency.MockPersistent;
Modified: gora/trunk/gora-core/src/test/java/org/apache/gora/util/TestIOUtils.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/test/java/org/apache/gora/util/TestIOUtils.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-core/src/test/java/org/apache/gora/util/TestIOUtils.java (original)
+++ gora/trunk/gora-core/src/test/java/org/apache/gora/util/TestIOUtils.java Sat Apr 12 19:21:53 2014
@@ -27,11 +27,10 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
-import org.apache.avro.ipc.ByteBufferInputStream;
-import org.apache.avro.ipc.ByteBufferOutputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.avro.util.ByteBufferInputStream;
+import org.apache.avro.util.ByteBufferOutputStream;
import org.apache.gora.mapreduce.GoraMapReduceUtils;
+import org.apache.gora.util.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -39,6 +38,8 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Test case for {@link IOUtils} class.
Modified: gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java (original)
+++ gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java Sat Apr 12 19:21:53 2014
@@ -36,7 +36,6 @@ import org.apache.gora.persistency.Persi
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
-import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.store.ws.impl.WSDataStoreBase;
import org.apache.gora.util.GoraException;
import org.slf4j.Logger;
@@ -162,11 +161,11 @@ public class DynamoDBStore<K, T extends
LOG.debug("Initializing DynamoDB store");
getCredentials();
setWsProvider(wsProvider);
- preferredSchema = DataStoreFactory.findProperty(properties, this, PREF_SCH_NAME, null);
- dynamoDBClient = getClient(DataStoreFactory.findProperty(properties, this, CLI_TYP_PROP, null),(AWSCredentials)getConf());
- dynamoDBClient.setEndpoint(DataStoreFactory.findProperty(properties, this, ENDPOINT_PROP, null));
+ preferredSchema = properties.getProperty(PREF_SCH_NAME);
+ dynamoDBClient = getClient(properties.getProperty(CLI_TYP_PROP),(AWSCredentials)getConf());
+ dynamoDBClient.setEndpoint(properties.getProperty(ENDPOINT_PROP));
mapping = readMapping();
- consistency = DataStoreFactory.findProperty(properties, this, CONSISTENCY_READS, null);
+ consistency = properties.getProperty(CONSISTENCY_READS);
persistentClass = pPersistentClass;
}
catch (Exception e) {
Modified: gora/trunk/gora-hbase/pom.xml
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/pom.xml?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-hbase/pom.xml (original)
+++ gora/trunk/gora-hbase/pom.xml Sat Apr 12 19:21:53 2014
@@ -100,6 +100,7 @@
<dependency>
<groupId>org.apache.gora</groupId>
<artifactId>gora-core</artifactId>
+ <scope>compile</scope>
</dependency>
<dependency>
@@ -108,45 +109,49 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <!-- END of Gora Internal Dependencies -->
- <!-- Hadoop Dependencies -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
+ <scope>compile</scope>
</dependency>
<dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase</artifactId>
- <type>test-jar</type>
- </dependency>
-
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
+ <groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
+ <scope>compile</scope>
</dependency>
<!-- Misc Dependencies -->
<dependency>
<groupId>org.jdom</groupId>
<artifactId>jdom</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>compile</scope>
</dependency>
<!-- Logging Dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
+ <scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>javax.jms</groupId>
@@ -154,18 +159,28 @@
</exclusion>
</exclusions>
</dependency>
+ <!-- END of Logging Dependencies -->
<!-- Testing Dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-test</artifactId>
+ <scope>test</scope>
</dependency>
-
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <!-- END of Testing Dependencies -->
</dependencies>
</project>
Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java Sat Apr 12 19:21:53 2014
@@ -21,7 +21,6 @@ package org.apache.gora.hbase.query;
import java.io.IOException;
import org.apache.gora.hbase.store.HBaseStore;
-import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.Query;
import org.apache.hadoop.hbase.client.Get;
Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java Sat Apr 12 19:21:53 2014
@@ -23,7 +23,6 @@ import static org.apache.gora.hbase.util
import java.io.IOException;
import org.apache.gora.hbase.store.HBaseStore;
-import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.Query;
import org.apache.gora.query.impl.ResultBase;
Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java Sat Apr 12 19:21:53 2014
@@ -25,7 +25,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -36,21 +35,15 @@ import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericArray;
import org.apache.avro.util.Utf8;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.gora.hbase.query.HBaseGetResult;
import org.apache.gora.hbase.query.HBaseQuery;
import org.apache.gora.hbase.query.HBaseScannerResult;
import org.apache.gora.hbase.store.HBaseMapping.HBaseMappingBuilder;
import org.apache.gora.hbase.util.HBaseByteInterface;
import org.apache.gora.hbase.util.HBaseFilterUtil;
-import org.apache.gora.persistency.ListGenericArray;
-import org.apache.gora.persistency.State;
-import org.apache.gora.persistency.StateManager;
-import org.apache.gora.persistency.StatefulHashMap;
-import org.apache.gora.persistency.StatefulMap;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
+import org.apache.gora.persistency.impl.DirtyMapWrapper;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
@@ -74,6 +67,8 @@ import org.apache.hadoop.hbase.util.Pair
import org.jdom.Document;
import org.jdom.Element;
import org.jdom.input.SAXBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* DataStore for HBase. Thread safe.
@@ -135,7 +130,7 @@ implements Configurable {
} catch (Exception e) {
throw new RuntimeException(e);
}
-
+
// Set scanner caching option
try {
this.setScannerCaching(
@@ -146,7 +141,7 @@ implements Configurable {
LOG.error("Can not load " + SCANNER_CACHING_PROPERTIES_KEY + " from gora.properties. Setting to default value: " + SCANNER_CACHING_PROPERTIES_DEFAULT, e) ;
this.setScannerCaching(SCANNER_CACHING_PROPERTIES_DEFAULT) ; // Default value if something is wrong
}
-
+
if(autoCreateSchema) {
createSchema();
}
@@ -225,126 +220,117 @@ implements Configurable {
}
/**
- * {@inheritDoc}
- * Serializes the Persistent data and saves in HBase.
- * Topmost fields of the record are persisted in "raw" format (not avro serialized). This behavior happens
- * in maps and arrays too.
+ * {@inheritDoc} Serializes the Persistent data and saves in HBase. Topmost
+ * fields of the record are persisted in "raw" format (not avro serialized).
+ * This behavior happens in maps and arrays too.
*
- * ["null","type"] type (a.k.a. optional field) is persisted like as if it is ["type"], but the column get
- * deleted if value==null (so value read after will be null).
+ * ["null","type"] type (a.k.a. optional field) is persisted like as if it is
+ * ["type"], but the column get deleted if value==null (so value read after
+ * will be null).
*
- * @param persistent Record to be persisted in HBase
+ * @param persistent
+ * Record to be persisted in HBase
*/
- @SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void put(K key, T persistent) {
- try{
+ try {
Schema schema = persistent.getSchema();
- StateManager stateManager = persistent.getStateManager();
byte[] keyRaw = toBytes(key);
Put put = new Put(keyRaw);
Delete delete = new Delete(keyRaw);
- boolean hasPuts = false;
- boolean hasDeletes = false;
- Iterator<Field> iter = schema.getFields().iterator();
- for (int i = 0; iter.hasNext(); i++) {
- Field field = iter.next();
- if (!stateManager.isDirty(persistent, i)) {
+ List<Field> fields = schema.getFields();
+ for (int i = 1; i < fields.size(); i++) {
+ if (!persistent.isDirty(i)) {
continue;
}
- Type type = field.schema().getType();
+ Field field = fields.get(i);
Object o = persistent.get(i);
HBaseColumn hcol = mapping.getColumn(field.name());
if (hcol == null) {
- throw new RuntimeException("HBase mapping for field ["+ persistent.getClass().getName() +
- "#"+ field.name()+"] not found. Wrong gora-hbase-mapping.xml?");
- }
- switch(type) {
- case MAP:
- if(o instanceof StatefulMap) {
- StatefulHashMap<Utf8, ?> map = (StatefulHashMap<Utf8, ?>) o;
- for (Entry<Utf8, State> e : map.states().entrySet()) {
- Utf8 mapKey = e.getKey();
- switch (e.getValue()) {
- case DIRTY:
- byte[] qual = Bytes.toBytes(mapKey.toString());
- byte[] val = toBytes(map.get(mapKey), field.schema().getValueType());
- // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
- if (val == null) { // value == null => must delete the column
- delete.deleteColumn(hcol.getFamily(), qual);
- hasDeletes = true;
- } else {
- put.add(hcol.getFamily(), qual, val);
- hasPuts = true;
- }
- break;
- case DELETED:
- qual = Bytes.toBytes(mapKey.toString());
- hasDeletes = true;
- delete.deleteColumn(hcol.getFamily(), qual);
- break;
- default :
- break;
- }
- }
- } else {
- Set<Map.Entry> set = ((Map)o).entrySet();
- for(Entry entry: set) {
- byte[] qual = toBytes(entry.getKey());
- byte[] val = toBytes(entry.getValue(), field.schema().getValueType());
- // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
- if (val == null) { // value == null => must delete the column
- delete.deleteColumn(hcol.getFamily(), qual);
- hasDeletes = true;
- } else {
- put.add(hcol.getFamily(), qual, val);
- hasPuts = true;
- }
- }
- }
- break;
- case ARRAY:
- if(o instanceof GenericArray) {
- GenericArray arr = (GenericArray) o;
- int j=0;
- for(Object item : arr) {
- byte[] val = toBytes(item, field.schema().getElementType());
- // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
- if (val == null) { // value == null => must delete the column
- delete.deleteColumn(hcol.getFamily(), Bytes.toBytes(j++));
- hasDeletes = true;
- } else {
- put.add(hcol.getFamily(), Bytes.toBytes(j++), val);
- hasPuts = true;
- }
- }
- }
- break;
- default:
- // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
- byte[] serializedBytes = toBytes(o, field.schema()) ;
- if (serializedBytes == null) { // value == null => must delete the column
- delete.deleteColumn(hcol.getFamily(), hcol.getQualifier());
- hasDeletes = true;
- } else {
- put.add(hcol.getFamily(), hcol.getQualifier(), serializedBytes);
- hasPuts = true;
- }
- break;
+ throw new RuntimeException("HBase mapping for field ["
+ + persistent.getClass().getName() + "#" + field.name()
+ + "] not found. Wrong gora-hbase-mapping.xml?");
}
+ addPutsAndDeletes(put, delete, o, field.schema().getType(),
+ field.schema(), hcol, hcol.getQualifier());
}
- if (hasPuts) {
+ if (put.size() > 0) {
table.put(put);
}
- if (hasDeletes) {
+ if (delete.size() > 0) {
table.delete(delete);
+ table.delete(delete);
+ table.delete(delete); // HBase sometimes does not delete arbitrarily
}
- } catch(IOException ex2){
+ } catch (IOException ex2) {
LOG.error(ex2.getMessage());
LOG.error(ex2.getStackTrace().toString());
}
}
+ private void addPutsAndDeletes(Put put, Delete delete, Object o, Type type,
+ Schema schema, HBaseColumn hcol, byte[] qualifier) throws IOException {
+ switch (type) {
+ case UNION:
+ if (isNullable(schema) && o == null) {
+ if (qualifier == null) {
+ delete.deleteFamily(hcol.getFamily());
+ } else {
+ delete.deleteColumn(hcol.getFamily(), qualifier);
+ }
+ } else {
+// int index = GenericData.get().resolveUnion(schema, o);
+ int index = getResolvedUnionIndex(schema);
+ if (index > 1) { //if more than 2 type in union, serialize directly for now
+ byte[] serializedBytes = toBytes(o, schema);
+ put.add(hcol.getFamily(), qualifier, serializedBytes);
+ } else {
+ Schema resolvedSchema = schema.getTypes().get(index);
+ addPutsAndDeletes(put, delete, o, resolvedSchema.getType(),
+ resolvedSchema, hcol, qualifier);
+ }
+ }
+ break;
+ case MAP:
+ // if it's a map that has been modified, then the content should be replaced by the new one
+ // This is because we don't know if the content has changed or not.
+ if (qualifier == null) {
+ delete.deleteFamily(hcol.getFamily());
+ } else {
+ delete.deleteColumn(hcol.getFamily(), qualifier);
+ }
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ Set<Entry> set = ((Map) o).entrySet();
+ for (@SuppressWarnings("rawtypes") Entry entry : set) {
+ byte[] qual = toBytes(entry.getKey());
+ addPutsAndDeletes(put, delete, entry.getValue(), schema.getValueType()
+ .getType(), schema.getValueType(), hcol, qual);
+ }
+ break;
+ case ARRAY:
+ List<?> array = (List<?>) o;
+ int j = 0;
+ for (Object item : array) {
+ addPutsAndDeletes(put, delete, item, schema.getElementType().getType(),
+ schema.getElementType(), hcol, Bytes.toBytes(j++));
+ }
+ break;
+ default:
+ byte[] serializedBytes = toBytes(o, schema);
+ put.add(hcol.getFamily(), qualifier, serializedBytes);
+ break;
+ }
+ }
+
+ private boolean isNullable(Schema unionSchema) {
+ for (Schema innerSchema : unionSchema.getTypes()) {
+ if (innerSchema.getType().equals(Schema.Type.NULL)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public void delete(T obj) {
throw new RuntimeException("Not implemented yet");
}
@@ -373,8 +359,7 @@ implements Configurable {
String[] fields = getFieldsToQuery(query.getFields());
//find whether all fields are queried, which means that complete
//rows will be deleted
- boolean isAllFields = Arrays.equals(fields
- , getBeanFactory().getCachedPersistent().getFields());
+ boolean isAllFields = Arrays.equals(fields, getFields());
org.apache.gora.query.Result<K, T> result = null;
result = query.execute();
@@ -519,18 +504,28 @@ implements Configurable {
"Wrong gora-hbase-mapping.xml?");
}
Schema fieldSchema = fieldMap.get(f).schema();
- switch (fieldSchema.getType()) {
- case MAP:
- case ARRAY:
- get.addFamily(col.family); break;
- default:
- get.addColumn(col.family, col.qualifier); break;
- }
+ addFamilyOrColumn(get, col, fieldSchema);
}
}
- private void addFields(Scan scan, Query<K,T> query)
- throws IOException {
+ private void addFamilyOrColumn(Get get, HBaseColumn col, Schema fieldSchema) {
+ switch (fieldSchema.getType()) {
+ case UNION:
+ int index = getResolvedUnionIndex(fieldSchema);
+ Schema resolvedSchema = fieldSchema.getTypes().get(index);
+ addFamilyOrColumn(get, col, resolvedSchema);
+ break;
+ case MAP:
+ case ARRAY:
+ get.addFamily(col.family);
+ break;
+ default:
+ get.addColumn(col.family, col.qualifier);
+ break;
+ }
+ }
+
+ private void addFields(Scan scan, Query<K, T> query) throws IOException {
String[] fields = query.getFields();
for (String f : fields) {
HBaseColumn col = mapping.getColumn(f);
@@ -539,19 +534,30 @@ implements Configurable {
"Wrong gora-hbase-mapping.xml?");
}
Schema fieldSchema = fieldMap.get(f).schema();
- switch (fieldSchema.getType()) {
- case MAP:
- case ARRAY:
- scan.addFamily(col.family); break;
- default:
- scan.addColumn(col.family, col.qualifier); break;
- }
+ addFamilyOrColumn(scan, col, fieldSchema);
}
}
- //TODO: HBase Get, Scan, Delete should extend some common interface with addFamily, etc
- private void addFields(Delete delete, Query<K,T> query)
- throws IOException {
+ private void addFamilyOrColumn(Scan scan, HBaseColumn col, Schema fieldSchema) {
+ switch (fieldSchema.getType()) {
+ case UNION:
+ int index = getResolvedUnionIndex(fieldSchema);
+ Schema resolvedSchema = fieldSchema.getTypes().get(index);
+ addFamilyOrColumn(scan, col, resolvedSchema);
+ break;
+ case MAP:
+ case ARRAY:
+ scan.addFamily(col.family);
+ break;
+ default:
+ scan.addColumn(col.family, col.qualifier);
+ break;
+ }
+ }
+
+ // TODO: HBase Get, Scan, Delete should extend some common interface with
+ // addFamily, etc
+ private void addFields(Delete delete, Query<K, T> query) throws IOException {
String[] fields = query.getFields();
for (String f : fields) {
HBaseColumn col = mapping.getColumn(f);
@@ -560,13 +566,25 @@ implements Configurable {
"Wrong gora-hbase-mapping.xml?");
}
Schema fieldSchema = fieldMap.get(f).schema();
- switch (fieldSchema.getType()) {
- case MAP:
- case ARRAY:
- delete.deleteFamily(col.family); break;
- default:
- delete.deleteColumn(col.family, col.qualifier); break;
- }
+ addFamilyOrColumn(delete, col, fieldSchema);
+ }
+ }
+
+ private void addFamilyOrColumn(Delete delete, HBaseColumn col,
+ Schema fieldSchema) {
+ switch (fieldSchema.getType()) {
+ case UNION:
+ int index = getResolvedUnionIndex(fieldSchema);
+ Schema resolvedSchema = fieldSchema.getTypes().get(index);
+ addFamilyOrColumn(delete, col, resolvedSchema);
+ break;
+ case MAP:
+ case ARRAY:
+ delete.deleteFamily(col.family);
+ break;
+ default:
+ delete.deleteColumn(col.family, col.qualifier);
+ break;
}
}
@@ -582,7 +600,6 @@ implements Configurable {
}
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
/**
* Creates a new Persistent instance with the values in 'result' for the fields listed.
* @param result result form a HTable#get()
@@ -597,7 +614,6 @@ implements Configurable {
return null;
T persistent = newPersistent();
- StateManager stateManager = persistent.getStateManager();
for (String f : fields) {
HBaseColumn col = mapping.getColumn(f);
if (col == null) {
@@ -606,50 +622,90 @@ implements Configurable {
}
Field field = fieldMap.get(f);
Schema fieldSchema = field.schema();
- switch(fieldSchema.getType()) {
- case MAP:
- NavigableMap<byte[], byte[]> qualMap =
- result.getNoVersionMap().get(col.getFamily());
- if (qualMap == null) {
- continue;
- }
- Schema valueSchema = fieldSchema.getValueType();
- Map map = new HashMap();
- for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
- map.put(new Utf8(Bytes.toString(e.getKey())),
- fromBytes(valueSchema, e.getValue()));
- }
- setField(persistent, field, map);
- break;
- case ARRAY:
- qualMap = result.getFamilyMap(col.getFamily());
- if (qualMap == null) {
- continue;
- }
- valueSchema = fieldSchema.getElementType();
- ArrayList arrayList = new ArrayList();
- for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
- arrayList.add(fromBytes(valueSchema, e.getValue()));
- }
- ListGenericArray arr = new ListGenericArray(fieldSchema, arrayList);
- setField(persistent, field, arr);
- break;
- default:
- byte[] val = result.getValue(col.getFamily(), col.getQualifier());
- if (val == null) {
- continue;
- }
- setField(persistent, field, val);
- break;
- }
+ setField(result,persistent, col, field, fieldSchema);
}
- stateManager.clearDirty(persistent);
+ persistent.clearDirty();
return persistent;
}
+ private void setField(Result result, T persistent, HBaseColumn col,
+ Field field, Schema fieldSchema) throws IOException {
+ switch (fieldSchema.getType()) {
+ case UNION:
+ int index = getResolvedUnionIndex(fieldSchema);
+ if (index > 1) { //if more than 2 type in union, deserialize directly for now
+ byte[] val = result.getValue(col.getFamily(), col.getQualifier());
+ if (val == null) {
+ return;
+ }
+ setField(persistent, field, val);
+ } else {
+ Schema resolvedSchema = fieldSchema.getTypes().get(index);
+ setField(result, persistent, col, field, resolvedSchema);
+ }
+ break;
+ case MAP:
+ NavigableMap<byte[], byte[]> qualMap = result.getNoVersionMap().get(
+ col.getFamily());
+ if (qualMap == null) {
+ return;
+ }
+ Schema valueSchema = fieldSchema.getValueType();
+ Map<Utf8, Object> map = new HashMap<Utf8, Object>();
+ for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
+ map.put(new Utf8(Bytes.toString(e.getKey())),
+ fromBytes(valueSchema, e.getValue()));
+ }
+ setField(persistent, field, map);
+ break;
+ case ARRAY:
+ qualMap = result.getFamilyMap(col.getFamily());
+ if (qualMap == null) {
+ return;
+ }
+ valueSchema = fieldSchema.getElementType();
+ ArrayList<Object> arrayList = new ArrayList<Object>();
+ DirtyListWrapper<Object> dirtyListWrapper = new DirtyListWrapper<Object>(arrayList);
+ for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
+ dirtyListWrapper.add(fromBytes(valueSchema, e.getValue()));
+ }
+ setField(persistent, field, arrayList);
+ break;
+ default:
+ byte[] val = result.getValue(col.getFamily(), col.getQualifier());
+ if (val == null) {
+ return;
+ }
+ setField(persistent, field, val);
+ break;
+ }
+ }
+
+ //TODO temporary solution, has to be changed after implementation of saving the index of union type
+ private int getResolvedUnionIndex(Schema unionScema) {
+ if (unionScema.getTypes().size() == 2) {
+
+ // schema [type0, type1]
+ Type type0 = unionScema.getTypes().get(0).getType();
+ Type type1 = unionScema.getTypes().get(1).getType();
+
+ // Check if types are different and there's a "null", like ["null","type"]
+ // or ["type","null"]
+ if (!type0.equals(type1)
+ && (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL))) {
+
+ if (type0.equals(Schema.Type.NULL))
+ return 1;
+ else
+ return 0;
+ }
+ }
+ return 2;
+ }
+
@SuppressWarnings({ "unchecked", "rawtypes" })
private void setField(T persistent, Field field, Map map) {
- persistent.put(field.pos(), new StatefulHashMap(map));
+ persistent.put(field.pos(), new DirtyMapWrapper(map));
}
private void setField(T persistent, Field field, byte[] val)
@@ -657,9 +713,9 @@ implements Configurable {
persistent.put(field.pos(), fromBytes(field.schema(), val));
}
- @SuppressWarnings("rawtypes")
- private void setField(T persistent, Field field, GenericArray list) {
- persistent.put(field.pos(), list);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private void setField(T persistent, Field field, List list) {
+ persistent.put(field.pos(), new DirtyListWrapper(list));
}
@SuppressWarnings("unchecked")
@@ -725,7 +781,6 @@ implements Configurable {
mappingBuilder.addField(fieldName, family, qualifier);
mappingBuilder.addColumnFamily(tableName, family);
}
-
//we found a matching key and value class definition,
//do not continue on other class definitions
break;
@@ -790,4 +845,4 @@ implements Configurable {
this.scannerCaching = numRows ;
return this ;
}
-}
\ No newline at end of file
+}
Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java Sat Apr 12 19:21:53 2014
@@ -19,12 +19,14 @@ package org.apache.gora.hbase.store;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
@@ -35,7 +37,11 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowLock;
+import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -145,18 +151,6 @@ public class HBaseTableConnection implem
}
@Override
- public void batch(List<Row> actions, Object[] results) throws IOException,
- InterruptedException {
- getTable().batch(actions, results);
- }
-
- @Override
- public Object[] batch(List<Row> actions) throws IOException,
- InterruptedException {
- return getTable().batch(actions);
- }
-
- @Override
public Result get(Get get) throws IOException {
return getTable().get(get);
}
@@ -254,4 +248,78 @@ public class HBaseTableConnection implem
public void unlockRow(RowLock rl) throws IOException {
getTable().unlockRow(rl);
}
+
+ @Override
+ public void batch(List<? extends Row> actions, Object[] results)
+ throws IOException, InterruptedException {
+ // TODO Auto-generated method stub
+ getTable().batch(actions, results);
+
+ }
+
+ @Override
+ public Object[] batch(List<? extends Row> actions) throws IOException,
+ InterruptedException {
+ // TODO Auto-generated method stub
+ return getTable().batch(actions);
+ }
+
+ @Override
+ public void mutateRow(RowMutations rm) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public Result append(Append append) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol,
+ byte[] row) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public <T extends CoprocessorProtocol, R> Map<byte[], R> coprocessorExec(
+ Class<T> protocol, byte[] startKey, byte[] endKey, Call<T, R> callable)
+ throws IOException, Throwable {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public <T extends CoprocessorProtocol, R> void coprocessorExec(
+ Class<T> protocol, byte[] startKey, byte[] endKey, Call<T, R> callable,
+ Callback<R> callback) throws IOException, Throwable {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setAutoFlush(boolean autoFlush) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public long getWriteBufferSize() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public void setWriteBufferSize(long writeBufferSize) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
}
Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java Sat Apr 12 19:21:53 2014
@@ -20,10 +20,8 @@ package org.apache.gora.hbase.util;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
@@ -31,12 +29,13 @@ import org.apache.avro.generic.GenericDa
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8;
+
import org.apache.gora.util.AvroUtils;
-import org.apache.gora.avro.PersistentDatumReader;
-import org.apache.gora.avro.PersistentDatumWriter;
+
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -47,27 +46,13 @@ public class HBaseByteInterface {
/**
* Threadlocals maintaining reusable binary decoders and encoders.
*/
+ private static ThreadLocal<ByteArrayOutputStream> outputStream =
+ new ThreadLocal<ByteArrayOutputStream>();
+
public static final ThreadLocal<BinaryDecoder> decoders =
new ThreadLocal<BinaryDecoder>();
- public static final ThreadLocal<BinaryEncoderWithStream> encoders =
- new ThreadLocal<BinaryEncoderWithStream>();
-
- /**
- * A BinaryEncoder that exposes the outputstream so that it can be reset
- * every time. (This is a workaround to reuse BinaryEncoder and the buffers,
- * normally provided be EncoderFactory, but this class does not exist yet
- * in the current Avro version).
- */
- public static final class BinaryEncoderWithStream extends BinaryEncoder {
- public BinaryEncoderWithStream(OutputStream out) {
- super(out);
- }
-
- protected OutputStream getOut() {
- return out;
- }
- }
-
+ public static final ThreadLocal<BinaryEncoder> encoders =
+ new ThreadLocal<BinaryEncoder>();
/*
* Create a threadlocal map for the datum readers and writers, because
* they are not thread safe, at least not before Avro 1.4.0 (See AVRO-650).
@@ -75,23 +60,15 @@ public class HBaseByteInterface {
* writer pair for every schema, instead of one for every thread.
*/
- public static final ThreadLocal<Map<String, SpecificDatumReader<?>>>
- readerMaps = new ThreadLocal<Map<String, SpecificDatumReader<?>>>() {
- protected Map<String,SpecificDatumReader<?>> initialValue() {
- return new HashMap<String, SpecificDatumReader<?>>();
- };
- };
-
- public static final ThreadLocal<Map<String, SpecificDatumWriter<?>>>
- writerMaps = new ThreadLocal<Map<String, SpecificDatumWriter<?>>>() {
- protected Map<String,SpecificDatumWriter<?>> initialValue() {
- return new HashMap<String, SpecificDatumWriter<?>>();
- };
- };
+ public static final ConcurrentHashMap<String, SpecificDatumReader<?>> readerMap =
+ new ConcurrentHashMap<String, SpecificDatumReader<?>>();
+
+ public static final ConcurrentHashMap<String, SpecificDatumWriter<?>> writerMap =
+ new ConcurrentHashMap<String, SpecificDatumWriter<?>>();
/**
- * Deserializes an array of bytes matching the given schema to the proper basic (enum, Utf8,...) or
- * complex type (Persistent/Record).
+ * Deserializes an array of bytes matching the given schema to the proper basic
+ * (enum, Utf8,...) or complex type (Persistent/Record).
*
* Does not handle <code>arrays/maps</code> if not inside a <code>record</code> type.
*
@@ -100,7 +77,7 @@ public class HBaseByteInterface {
* @return Enum|Utf8|ByteBuffer|Integer|Long|Float|Double|Boolean|Persistent|Null
* @throws IOException
*/
- @SuppressWarnings("rawtypes")
+ @SuppressWarnings({ "rawtypes" })
public static Object fromBytes(Schema schema, byte[] val) throws IOException {
Type type = schema.getType();
switch (type) {
@@ -144,37 +121,28 @@ public class HBaseByteInterface {
// => deserialize like "case RECORD"
case RECORD:
- Map<String, SpecificDatumReader<?>> readerMap = readerMaps.get();
- PersistentDatumReader<?> reader = null ;
-
- // For UNION schemas, must use a specific PersistentDatumReader
+ // For UNION schemas, must use a specific SpecificDatumReader
// from the readerMap since unions don't have own name
// (key name in map will be "UNION-type-type-...")
- if (schema.getType().equals(Schema.Type.UNION)) {
- reader = (PersistentDatumReader<?>)readerMap.get(String.valueOf(schema.hashCode()));
- if (reader == null) {
- reader = new PersistentDatumReader(schema, false);// ignore dirty bits
- readerMap.put(String.valueOf(schema.hashCode()), reader);
- }
- } else {
- // ELSE use reader for Record
- reader = (PersistentDatumReader<?>)readerMap.get(schema.getFullName());
- if (reader == null) {
- reader = new PersistentDatumReader(schema, false);// ignore dirty bits
- readerMap.put(schema.getFullName(), reader);
+ String schemaId = schema.getType().equals(Schema.Type.UNION) ? String.valueOf(schema.hashCode()) : schema.getFullName();
+
+ SpecificDatumReader<?> reader = (SpecificDatumReader<?>)readerMap.get(schemaId);
+ if (reader == null) {
+ reader = new SpecificDatumReader(schema);// ignore dirty bits
+ SpecificDatumReader localReader=null;
+ if((localReader=readerMap.putIfAbsent(schemaId, reader))!=null) {
+ reader = localReader;
}
}
// initialize a decoder, possibly reusing previous one
BinaryDecoder decoderFromCache = decoders.get();
- BinaryDecoder decoder=DecoderFactory.defaultFactory().
- createBinaryDecoder(val, decoderFromCache);
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(val, null);
// put in threadlocal cache if the initial get was empty
if (decoderFromCache==null) {
decoders.set(decoder);
}
-
- return reader.read((Object)null, schema, decoder);
+ return reader.read(null, decoder);
default: throw new RuntimeException("Unknown type: "+type);
}
}
@@ -255,7 +223,7 @@ public class HBaseByteInterface {
public static byte[] toBytes(Object o, Schema schema) throws IOException {
Type type = schema.getType();
switch (type) {
- case STRING: return Bytes.toBytes(((Utf8)o).toString()); // TODO: maybe ((Utf8)o).getBytes(); ?
+ case STRING: return Bytes.toBytes(((CharSequence)o).toString()); // TODO: maybe ((Utf8)o).getBytes(); ?
case BYTES: return ((ByteBuffer)o).array();
case INT: return Bytes.toBytes((Integer)o);
case LONG: return Bytes.toBytes((Long)o);
@@ -264,65 +232,26 @@ public class HBaseByteInterface {
case BOOLEAN: return (Boolean)o ? new byte[] {1} : new byte[] {0};
case ENUM: return new byte[] { (byte)((Enum<?>) o).ordinal() };
case UNION:
- // XXX Special case: When writing the top-level field of a record we must handle the
- // special case ["null","type"] definitions: this will be written as if it was ["type"]
- // if not in a special case, will execute "case RECORD".
-
- if (schema.getTypes().size() == 2) {
-
- // schema [type0, type1]
- Type type0 = schema.getTypes().get(0).getType() ;
- Type type1 = schema.getTypes().get(1).getType() ;
-
- // Check if types are different and there's a "null", like ["null","type"] or ["type","null"]
- if (!type0.equals(type1)
- && ( type0.equals(Schema.Type.NULL)
- || type1.equals(Schema.Type.NULL))) {
-
- if (o == null) return null ;
-
- int index = GenericData.get().resolveUnion(schema, o);
- schema = schema.getTypes().get(index) ;
-
- return toBytes(o, schema) ; // Serialize as if schema was ["type"]
- }
-
- }
- // else
- // type = [type0,type1] where type0=type1
- // => Serialize like "case RECORD" with Avro
-
case RECORD:
- Map<String, SpecificDatumWriter<?>> writerMap = writerMaps.get();
- PersistentDatumWriter writer = null ;
- // For UNION schemas, must use a specific PersistentDatumReader
- // from the readerMap since unions don't have own name
- // (key name in map will be "UNION-type-type-...")
- if (schema.getType().equals(Schema.Type.UNION)) {
- writer = (PersistentDatumWriter<?>) writerMap.get(String.valueOf(schema.hashCode()));
- if (writer == null) {
- writer = new PersistentDatumWriter(schema,false);// ignore dirty bits
- writerMap.put(String.valueOf(schema.hashCode()),writer);
- }
- } else {
- // ELSE use writer for Record
- writer = (PersistentDatumWriter<?>) writerMap.get(schema.getFullName());
- if (writer == null) {
- writer = new PersistentDatumWriter(schema,false);// ignore dirty bits
- writerMap.put(schema.getFullName(),writer);
- }
+ SpecificDatumWriter writer = (SpecificDatumWriter<?>) writerMap.get(schema.getFullName());
+ if (writer == null) {
+ writer = new SpecificDatumWriter(schema);// ignore dirty bits
+ writerMap.put(schema.getFullName(),writer);
}
-
- BinaryEncoderWithStream encoder = encoders.get();
- if (encoder == null) {
- encoder = new BinaryEncoderWithStream(new ByteArrayOutputStream());
+
+ BinaryEncoder encoderFromCache = encoders.get();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ outputStream.set(bos);
+ BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(bos, null);
+ if (encoderFromCache == null) {
encoders.set(encoder);
}
+
//reset the buffers
- ByteArrayOutputStream os = (ByteArrayOutputStream) encoder.getOut();
+ ByteArrayOutputStream os = outputStream.get();
os.reset();
-
- writer.write(schema,o, encoder);
+
+ writer.write(o, encoder);
encoder.flush();
return os.toByteArray();
default: throw new RuntimeException("Unknown type: "+type);
Modified: gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml (original)
+++ gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml Sat Apr 12 19:21:53 2014
@@ -44,6 +44,7 @@
<field name="content" family="content"/>
<field name="parsedContent" family="parsedContent"/>
<field name="outlinks" family="outlinks"/>
+ <field name="headers" family="headers"/>
<field name="metadata" family="common" qualifier="metadata"/>
</class>
Modified: gora/trunk/gora-hbase/src/test/conf/hbase-site.xml
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/test/conf/hbase-site.xml?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/test/conf/hbase-site.xml (original)
+++ gora/trunk/gora-hbase/src/test/conf/hbase-site.xml Sat Apr 12 19:21:53 2014
@@ -119,7 +119,6 @@
<description>
Maximum desired file size for an HRegion. If filesize exceeds
value + (value / 2), the HRegion is split in two. Default: 256M.
-
Keep the maximum filesize small so we split more often in tests.
</description>
</property>
@@ -129,9 +128,14 @@
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
- <value>21818</value>
+ <value>2181</value>
<description>Property from ZooKeeper's config zoo.cfg.
The port at which the clients will connect.
</description>
</property>
+ <property>
+ <name>hbase.zookeeper.quorum</name>
+ <value>localhost</value>
+ <description>The directory shared by region servers.</description>
+ </property>
</configuration>
Modified: gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java (original)
+++ gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java Sat Apr 12 19:21:53 2014
@@ -22,19 +22,22 @@ import org.apache.gora.GoraTestDriver;
import org.apache.gora.hbase.store.HBaseStore;
import org.apache.gora.hbase.util.HBaseClusterSingleton;
import org.apache.hadoop.conf.Configuration;
-
-//HBase imports
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
/**
* Helper class for third part tests using gora-hbase backend.
* @see GoraTestDriver
*/
public class GoraHBaseTestDriver extends GoraTestDriver {
+
+ /**
+ * Cluster object used for testing.
+ */
private static final HBaseClusterSingleton cluster = HBaseClusterSingleton.build(1);
+ /**
+ * Default Constructor.
+ */
public GoraHBaseTestDriver() {
super(HBaseStore.class);
}
@@ -42,6 +45,7 @@ public class GoraHBaseTestDriver extends
@Override
public void setUpClass() throws Exception {
super.setUpClass();
+ conf = getConf();
log.info("Setting up HBase Test Driver");
}
@@ -50,28 +54,40 @@ public class GoraHBaseTestDriver extends
super.tearDownClass();
log.info("Teardown HBase test driver");
}
-
+
@Override
public void setUp() throws Exception {
cluster.truncateAllTables();
// super.setUp() deletes all tables, but must only truncate in the right way -HBaseClusterSingleton-
//super.setUp();
}
-
+
@Override
public void tearDown() throws Exception {
// Do nothing. setUp() must ensure the right data.
}
+
+ /**
+ * Deletes all tables from the MiniCluster
+ * @throws Exception in case some table is not able to be deleted.
+ */
public void deleteAllTables() throws Exception {
cluster.deleteAllTables();
}
-
+
+ /**
+ * Gets the configuration from the MiniCluster.
+ * @return Configuration from MiniCluster.
+ */
public Configuration getConf() {
return cluster.getHbaseTestingUtil().getConfiguration();
}
-
+
+ /**
+ * Gets HBaseTestingUtility from the MiniCluster object.
+ * @return HBaseTestingUtility object
+ */
public HBaseTestingUtility getHbaseUtil() {
return cluster.getHbaseTestingUtil();
}
-
-}
+}
Modified: gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java?rev=1586888&r1=1586887&r2=1586888&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java (original)
+++ gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java Sat Apr 12 19:21:53 2014
@@ -18,7 +18,6 @@
package org.apache.gora.hbase.mapreduce;
-import org.apache.gora.examples.generated.TokenDatum;
import org.apache.gora.examples.generated.WebPage;
import org.apache.gora.hbase.store.HBaseStore;
import org.apache.gora.hbase.util.HBaseClusterSingleton;