You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2013/11/02 19:38:01 UTC
svn commit: r1538247 - in /gora/branches/GORA_94: ./
gora-core/src/main/java/org/apache/gora/avro/store/ gora-hbase/
gora-hbase/src/main/java/org/apache/gora/hbase/store/
gora-hbase/src/main/java/org/apache/gora/hbase/util/
gora-hbase/src/test/conf/ go...
Author: lewismc
Date: Sat Nov 2 18:38:01 2013
New Revision: 1538247
URL: http://svn.apache.org/r1538247
Log:
GORA-246v2.patch Upgrade to Avro 1.7.X in gora-hbase and HBase upgrade to 0.94.9
Modified:
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/avro/store/AvroStore.java
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/avro/store/DataFileAvroStore.java
gora/branches/GORA_94/gora-hbase/pom.xml
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
gora/branches/GORA_94/gora-hbase/src/test/conf/hbase-site.xml
gora/branches/GORA_94/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java
gora/branches/GORA_94/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java
gora/branches/GORA_94/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
gora/branches/GORA_94/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java
gora/branches/GORA_94/pom.xml
Modified: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/avro/store/AvroStore.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/avro/store/AvroStore.java?rev=1538247&r1=1538246&r2=1538247&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/avro/store/AvroStore.java (original)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/avro/store/AvroStore.java Sat Nov 2 18:38:01 2013
@@ -23,16 +23,12 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.Properties;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.io.JsonDecoder;
-import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.gora.avro.query.AvroQuery;
Modified: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/avro/store/DataFileAvroStore.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/avro/store/DataFileAvroStore.java?rev=1538247&r1=1538246&r2=1538247&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/avro/store/DataFileAvroStore.java (original)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/avro/store/DataFileAvroStore.java Sat Nov 2 18:38:01 2013
@@ -24,7 +24,6 @@ import org.apache.avro.file.DataFileRead
import org.apache.avro.file.DataFileWriter;
import org.apache.gora.avro.mapreduce.FsInput;
import org.apache.gora.avro.query.DataFileAvroResult;
-import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
Modified: gora/branches/GORA_94/gora-hbase/pom.xml
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/pom.xml?rev=1538247&r1=1538246&r2=1538247&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/pom.xml (original)
+++ gora/branches/GORA_94/gora-hbase/pom.xml Sat Nov 2 18:38:01 2013
@@ -123,7 +123,7 @@
<dependency>
- <groupId>org.apache.hadoop</groupId>
+ <groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
Modified: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java?rev=1538247&r1=1538246&r2=1538247&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java Sat Nov 2 18:38:01 2013
@@ -45,11 +45,8 @@ import org.apache.gora.hbase.query.HBase
import org.apache.gora.hbase.query.HBaseScannerResult;
import org.apache.gora.hbase.store.HBaseMapping.HBaseMappingBuilder;
import org.apache.gora.hbase.util.HBaseByteInterface;
-import org.apache.gora.persistency.ListGenericArray;
-import org.apache.gora.persistency.State;
-import org.apache.gora.persistency.StateManager;
-import org.apache.gora.persistency.StatefulHashMap;
-import org.apache.gora.persistency.StatefulMap;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
+import org.apache.gora.persistency.impl.DirtyMapWrapper;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
@@ -216,7 +213,6 @@ implements Configurable {
public void put(K key, T persistent) {
try{
Schema schema = persistent.getSchema();
- StateManager stateManager = persistent.getStateManager();
byte[] keyRaw = toBytes(key);
Put put = new Put(keyRaw);
Delete delete = new Delete(keyRaw);
@@ -225,7 +221,7 @@ implements Configurable {
Iterator<Field> iter = schema.getFields().iterator();
for (int i = 0; iter.hasNext(); i++) {
Field field = iter.next();
- if (!stateManager.isDirty(persistent, i)) {
+ if (i==0 || !persistent.isDirty(i)) {
continue;
}
Type type = field.schema().getType();
@@ -237,43 +233,17 @@ implements Configurable {
}
switch(type) {
case MAP:
- if(o instanceof StatefulMap) {
- StatefulHashMap<Utf8, ?> map = (StatefulHashMap<Utf8, ?>) o;
- for (Entry<Utf8, State> e : map.states().entrySet()) {
- Utf8 mapKey = e.getKey();
- switch (e.getValue()) {
- case DIRTY:
- byte[] qual = Bytes.toBytes(mapKey.toString());
- byte[] val = toBytes(map.get(mapKey), field.schema().getValueType());
- // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
- if (val == null) { // value == null => must delete the column
- delete.deleteColumn(hcol.getFamily(), qual);
- hasDeletes = true;
- } else {
- put.add(hcol.getFamily(), qual, val);
- hasPuts = true;
- }
- break;
- case DELETED:
- qual = Bytes.toBytes(mapKey.toString());
- hasDeletes = true;
- delete.deleteColumn(hcol.getFamily(), qual);
- break;
- }
- }
- } else {
- Set<Map.Entry> set = ((Map)o).entrySet();
- for(Entry entry: set) {
- byte[] qual = toBytes(entry.getKey());
- byte[] val = toBytes(entry.getValue(), field.schema().getValueType());
- // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
- if (val == null) { // value == null => must delete the column
- delete.deleteColumn(hcol.getFamily(), qual);
- hasDeletes = true;
- } else {
- put.add(hcol.getFamily(), qual, val);
- hasPuts = true;
- }
+ Set<Map.Entry> set = ((Map)o).entrySet();
+ for(Entry entry: set) {
+ byte[] qual = toBytes(entry.getKey());
+ byte[] val = toBytes(entry.getValue(), field.schema().getValueType());
+ // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
+ if (val == null) { // value == null => must delete the column
+ delete.deleteColumn(hcol.getFamily(), qual);
+ hasDeletes = true;
+ } else {
+ put.add(hcol.getFamily(), qual, val);
+ hasPuts = true;
}
}
break;
@@ -347,8 +317,7 @@ implements Configurable {
String[] fields = getFieldsToQuery(query.getFields());
//find whether all fields are queried, which means that complete
//rows will be deleted
- boolean isAllFields = Arrays.equals(fields
- , getBeanFactory().getCachedPersistent().getFields());
+ boolean isAllFields = Arrays.equals(fields, getFields());
org.apache.gora.query.Result<K, T> result = null;
result = query.execute();
@@ -561,7 +530,6 @@ implements Configurable {
return null;
T persistent = newPersistent();
- StateManager stateManager = persistent.getStateManager();
for (String f : fields) {
HBaseColumn col = mapping.getColumn(f);
if (col == null) {
@@ -592,11 +560,11 @@ implements Configurable {
}
valueSchema = fieldSchema.getElementType();
ArrayList arrayList = new ArrayList();
+ DirtyListWrapper dirtyListWrapper = new DirtyListWrapper(arrayList);
for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
- arrayList.add(fromBytes(valueSchema, e.getValue()));
+ dirtyListWrapper.add(fromBytes(valueSchema, e.getValue()));
}
- ListGenericArray arr = new ListGenericArray(fieldSchema, arrayList);
- setField(persistent, field, arr);
+ setField(persistent, field, arrayList);
break;
default:
byte[] val = result.getValue(col.getFamily(), col.getQualifier());
@@ -607,13 +575,13 @@ implements Configurable {
break;
}
}
- stateManager.clearDirty(persistent);
+ persistent.clearDirty();
return persistent;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void setField(T persistent, Field field, Map map) {
- persistent.put(field.pos(), new StatefulHashMap(map));
+ persistent.put(field.pos(), new DirtyMapWrapper(map));
}
private void setField(T persistent, Field field, byte[] val)
@@ -622,8 +590,8 @@ implements Configurable {
}
@SuppressWarnings("rawtypes")
- private void setField(T persistent, Field field, GenericArray list) {
- persistent.put(field.pos(), list);
+ private void setField(T persistent, Field field, List list) {
+ persistent.put(field.pos(), new DirtyListWrapper(list));
}
@SuppressWarnings("unchecked")
Modified: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java?rev=1538247&r1=1538246&r2=1538247&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java Sat Nov 2 18:38:01 2013
@@ -19,12 +19,14 @@ package org.apache.gora.hbase.store;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
@@ -35,7 +37,11 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowLock;
+import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -145,18 +151,6 @@ public class HBaseTableConnection implem
}
@Override
- public void batch(List<Row> actions, Object[] results) throws IOException,
- InterruptedException {
- getTable().batch(actions, results);
- }
-
- @Override
- public Object[] batch(List<Row> actions) throws IOException,
- InterruptedException {
- return getTable().batch(actions);
- }
-
- @Override
public Result get(Get get) throws IOException {
return getTable().get(get);
}
@@ -254,4 +248,78 @@ public class HBaseTableConnection implem
public void unlockRow(RowLock rl) throws IOException {
getTable().unlockRow(rl);
}
+
+ @Override
+ public void batch(List<? extends Row> actions, Object[] results)
+ throws IOException, InterruptedException {
+ // TODO Auto-generated method stub
+ getTable().batch(actions, results);
+
+ }
+
+ @Override
+ public Object[] batch(List<? extends Row> actions) throws IOException,
+ InterruptedException {
+ // TODO Auto-generated method stub
+ return getTable().batch(actions);
+ }
+
+ @Override
+ public void mutateRow(RowMutations rm) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public Result append(Append append) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol,
+ byte[] row) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public <T extends CoprocessorProtocol, R> Map<byte[], R> coprocessorExec(
+ Class<T> protocol, byte[] startKey, byte[] endKey, Call<T, R> callable)
+ throws IOException, Throwable {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public <T extends CoprocessorProtocol, R> void coprocessorExec(
+ Class<T> protocol, byte[] startKey, byte[] endKey, Call<T, R> callable,
+ Callback<R> callback) throws IOException, Throwable {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setAutoFlush(boolean autoFlush) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public long getWriteBufferSize() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public void setWriteBufferSize(long writeBufferSize) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
}
Modified: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java?rev=1538247&r1=1538246&r2=1538247&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java Sat Nov 2 18:38:01 2013
@@ -21,22 +21,29 @@ package org.apache.gora.hbase.util;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.io.ResolvingDecoder;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.Persistent;
import org.apache.gora.util.AvroUtils;
-import org.apache.gora.avro.PersistentDatumReader;
-import org.apache.gora.avro.PersistentDatumWriter;
+import org.apache.gora.util.ClassLoadingUtils;
+import org.apache.gora.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -47,27 +54,13 @@ public class HBaseByteInterface {
/**
* Threadlocals maintaining reusable binary decoders and encoders.
*/
+ private static ThreadLocal<ByteArrayOutputStream> outputStream =
+ new ThreadLocal<ByteArrayOutputStream>();
+
public static final ThreadLocal<BinaryDecoder> decoders =
new ThreadLocal<BinaryDecoder>();
- public static final ThreadLocal<BinaryEncoderWithStream> encoders =
- new ThreadLocal<BinaryEncoderWithStream>();
-
- /**
- * A BinaryEncoder that exposes the outputstream so that it can be reset
- * every time. (This is a workaround to reuse BinaryEncoder and the buffers,
- * normally provided be EncoderFactory, but this class does not exist yet
- * in the current Avro version).
- */
- public static final class BinaryEncoderWithStream extends BinaryEncoder {
- public BinaryEncoderWithStream(OutputStream out) {
- super(out);
- }
-
- protected OutputStream getOut() {
- return out;
- }
- }
-
+ public static final ThreadLocal<BinaryEncoder> encoders =
+ new ThreadLocal<BinaryEncoder>();
/*
* Create a threadlocal map for the datum readers and writers, because
* they are not thread safe, at least not before Avro 1.4.0 (See AVRO-650).
@@ -75,19 +68,9 @@ public class HBaseByteInterface {
* writer pair for every schema, instead of one for every thread.
*/
- public static final ThreadLocal<Map<String, SpecificDatumReader<?>>>
- readerMaps = new ThreadLocal<Map<String, SpecificDatumReader<?>>>() {
- protected Map<String,SpecificDatumReader<?>> initialValue() {
- return new HashMap<String, SpecificDatumReader<?>>();
- };
- };
-
- public static final ThreadLocal<Map<String, SpecificDatumWriter<?>>>
- writerMaps = new ThreadLocal<Map<String, SpecificDatumWriter<?>>>() {
- protected Map<String,SpecificDatumWriter<?>> initialValue() {
- return new HashMap<String, SpecificDatumWriter<?>>();
- };
- };
+ public static final ConcurrentHashMap<String, SpecificDatumReader<?>> readerMap = new ConcurrentHashMap<String, SpecificDatumReader<?>>();
+
+ public static final ConcurrentHashMap<String, SpecificDatumWriter<?>> writerMap = new ConcurrentHashMap<String, SpecificDatumWriter<?>>();
/**
* Deserializes an array of bytes matching the given schema to the proper basic (enum, Utf8,...) or
@@ -100,7 +83,7 @@ public class HBaseByteInterface {
* @return Enum|Utf8|ByteBuffer|Integer|Long|Float|Double|Boolean|Persistent|Null
* @throws IOException
*/
- @SuppressWarnings("rawtypes")
+ @SuppressWarnings({ "rawtypes", "unchecked" })
public static Object fromBytes(Schema schema, byte[] val) throws IOException {
Type type = schema.getType();
switch (type) {
@@ -144,37 +127,28 @@ public class HBaseByteInterface {
// => deserialize like "case RECORD"
case RECORD:
- Map<String, SpecificDatumReader<?>> readerMap = readerMaps.get();
- PersistentDatumReader<?> reader = null ;
-
- // For UNION schemas, must use a specific PersistentDatumReader
+ // For UNION schemas, must use a specific SpecificDatumReader
// from the readerMap since unions don't have own name
// (key name in map will be "UNION-type-type-...")
- if (schema.getType().equals(Schema.Type.UNION)) {
- reader = (PersistentDatumReader<?>)readerMap.get(String.valueOf(schema.hashCode()));
- if (reader == null) {
- reader = new PersistentDatumReader(schema, false);// ignore dirty bits
- readerMap.put(String.valueOf(schema.hashCode()), reader);
- }
- } else {
- // ELSE use reader for Record
- reader = (PersistentDatumReader<?>)readerMap.get(schema.getFullName());
- if (reader == null) {
- reader = new PersistentDatumReader(schema, false);// ignore dirty bits
- readerMap.put(schema.getFullName(), reader);
+ String schemaId = schema.getType().equals(Schema.Type.UNION) ? String.valueOf(schema.hashCode()) : schema.getFullName();
+
+ SpecificDatumReader<?> reader = (SpecificDatumReader<?>)readerMap.get(schemaId);
+ if (reader == null) {
+ reader = new SpecificDatumReader(schema);// ignore dirty bits
+ SpecificDatumReader localReader=null;
+ if((localReader=readerMap.putIfAbsent(schemaId, reader))!=null) {
+ reader = localReader;
}
}
// initialize a decoder, possibly reusing previous one
BinaryDecoder decoderFromCache = decoders.get();
- BinaryDecoder decoder=DecoderFactory.defaultFactory().
- createBinaryDecoder(val, decoderFromCache);
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(val, null);
// put in threadlocal cache if the initial get was empty
if (decoderFromCache==null) {
decoders.set(decoder);
}
-
- return reader.read((Object)null, schema, decoder);
+ return reader.read(null, decoder);
default: throw new RuntimeException("Unknown type: "+type);
}
}
@@ -253,7 +227,7 @@ public class HBaseByteInterface {
public static byte[] toBytes(Object o, Schema schema) throws IOException {
Type type = schema.getType();
switch (type) {
- case STRING: return Bytes.toBytes(((Utf8)o).toString()); // TODO: maybe ((Utf8)o).getBytes(); ?
+ case STRING: return Bytes.toBytes(((CharSequence)o).toString()); // TODO: maybe ((Utf8)o).getBytes(); ?
case BYTES: return ((ByteBuffer)o).array();
case INT: return Bytes.toBytes((Integer)o);
case LONG: return Bytes.toBytes((Long)o);
@@ -291,36 +265,25 @@ public class HBaseByteInterface {
// => Serialize like "case RECORD" with Avro
case RECORD:
- Map<String, SpecificDatumWriter<?>> writerMap = writerMaps.get();
- PersistentDatumWriter writer = null ;
- // For UNION schemas, must use a specific PersistentDatumReader
- // from the readerMap since unions don't have own name
- // (key name in map will be "UNION-type-type-...")
- if (schema.getType().equals(Schema.Type.UNION)) {
- writer = (PersistentDatumWriter<?>) writerMap.get(String.valueOf(schema.hashCode()));
- if (writer == null) {
- writer = new PersistentDatumWriter(schema,false);// ignore dirty bits
- writerMap.put(String.valueOf(schema.hashCode()),writer);
- }
- } else {
- // ELSE use writer for Record
- writer = (PersistentDatumWriter<?>) writerMap.get(schema.getFullName());
- if (writer == null) {
- writer = new PersistentDatumWriter(schema,false);// ignore dirty bits
- writerMap.put(schema.getFullName(),writer);
- }
+ SpecificDatumWriter writer = (SpecificDatumWriter<?>) writerMap.get(schema.getFullName());
+ if (writer == null) {
+ writer = new SpecificDatumWriter(schema);// ignore dirty bits
+ writerMap.put(schema.getFullName(),writer);
}
- BinaryEncoderWithStream encoder = encoders.get();
- if (encoder == null) {
- encoder = new BinaryEncoderWithStream(new ByteArrayOutputStream());
+ BinaryEncoder encoderFromCache = encoders.get();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ outputStream.set(bos);
+ BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(bos, null);
+ if (encoderFromCache == null) {
encoders.set(encoder);
}
+
//reset the buffers
- ByteArrayOutputStream os = (ByteArrayOutputStream) encoder.getOut();
+ ByteArrayOutputStream os = outputStream.get();
os.reset();
- writer.write(schema,o, encoder);
+ writer.write(o, encoder);
encoder.flush();
return os.toByteArray();
default: throw new RuntimeException("Unknown type: "+type);
Modified: gora/branches/GORA_94/gora-hbase/src/test/conf/hbase-site.xml
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/test/conf/hbase-site.xml?rev=1538247&r1=1538246&r2=1538247&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/test/conf/hbase-site.xml (original)
+++ gora/branches/GORA_94/gora-hbase/src/test/conf/hbase-site.xml Sat Nov 2 18:38:01 2013
@@ -129,7 +129,7 @@
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
- <value>21818</value>
+ <value>2181</value>
<description>Property from ZooKeeper's config zoo.cfg.
The port at which the clients will connect.
</description>
Modified: gora/branches/GORA_94/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java?rev=1538247&r1=1538246&r2=1538247&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/test/java/org/apache/gora/hbase/GoraHBaseTestDriver.java Sat Nov 2 18:38:01 2013
@@ -25,8 +25,6 @@ import org.apache.hadoop.conf.Configurat
//HBase imports
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
/**
* Helper class for third part tests using gora-hbase backend.
Modified: gora/branches/GORA_94/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java?rev=1538247&r1=1538246&r2=1538247&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreCountQuery.java Sat Nov 2 18:38:01 2013
@@ -18,7 +18,6 @@
package org.apache.gora.hbase.mapreduce;
-import org.apache.gora.examples.generated.TokenDatum;
import org.apache.gora.examples.generated.WebPage;
import org.apache.gora.hbase.store.HBaseStore;
import org.apache.gora.hbase.util.HBaseClusterSingleton;
Modified: gora/branches/GORA_94/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java?rev=1538247&r1=1538246&r2=1538247&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java Sat Nov 2 18:38:01 2013
@@ -56,7 +56,7 @@ public class TestHBaseStore extends Data
@Override
public void setUp() throws Exception {
super.setUp();
- conf = getTestDriver().getHbaseUtil().getConfiguration();
+ conf = getTestDriver().getConf();
}
@SuppressWarnings("unchecked")
@@ -87,7 +87,7 @@ public class TestHBaseStore extends Data
@Override
public void assertPutArray() throws IOException {
- HTable table = new HTable("WebPage");
+ HTable table = new HTable(conf,"WebPage");
Get get = new Get(Bytes.toBytes("com.example/http"));
org.apache.hadoop.hbase.client.Result result = table.get(get);
@@ -109,7 +109,7 @@ public class TestHBaseStore extends Data
public void assertPutBytes(byte[] contentBytes) throws IOException {
// Check first the parameter "contentBytes" if written+read right.
- HTable table = new HTable("WebPage");
+ HTable table = new HTable(conf,"WebPage");
Get get = new Get(Bytes.toBytes("com.example/http"));
org.apache.hadoop.hbase.client.Result result = table.get(get);
@@ -130,7 +130,7 @@ public class TestHBaseStore extends Data
page = webPageStore.get("com.example/http") ;
assertNull(page.getContent()) ;
// Check directly with HBase
- table = new HTable("WebPage");
+ table = new HTable(conf,"WebPage");
get = new Get(Bytes.toBytes("com.example/http"));
result = table.get(get);
actualBytes = result.getValue(Bytes.toBytes("content"), null);
@@ -146,7 +146,7 @@ public class TestHBaseStore extends Data
page = webPageStore.get("com.example/http") ;
assertTrue(Arrays.equals("".getBytes(),page.getContent().array())) ;
// Check directly with HBase
- table = new HTable("WebPage");
+ table = new HTable(conf,"WebPage");
get = new Get(Bytes.toBytes("com.example/http"));
result = table.get(get);
actualBytes = result.getValue(Bytes.toBytes("content"), null);
@@ -162,9 +162,9 @@ public class TestHBaseStore extends Data
@Test
public void assertTopLevelUnions() throws Exception {
WebPage page = webPageStore.newPersistent();
-
+
// Write webpage data
- page.setUrl(new Utf8("http://example.com"));
+ page.setUrl((CharSequence) new Utf8("http://example.com"));
byte[] contentBytes = "example content in example.com".getBytes();
ByteBuffer buff = ByteBuffer.wrap(contentBytes);
page.setContent(buff);
@@ -172,7 +172,7 @@ public class TestHBaseStore extends Data
webPageStore.flush() ;
// Read directly from HBase
- HTable table = new HTable("WebPage");
+ HTable table = new HTable(conf,"WebPage");
Get get = new Get(Bytes.toBytes("com.example/http"));
org.apache.hadoop.hbase.client.Result result = table.get(get);
@@ -180,6 +180,7 @@ public class TestHBaseStore extends Data
assertNotNull(bytesRead) ;
assertTrue(Arrays.equals(bytesRead, contentBytes));
+ table.close();
}
/**
@@ -193,14 +194,14 @@ public class TestHBaseStore extends Data
WebPage page = webPageStore.newPersistent();
// Write webpage data
- page.setUrl(new Utf8("http://example.com"));
+ page.setUrl((CharSequence) new Utf8("http://example.com"));
page.setContent(null); // This won't change internal field status to dirty, so
page.setDirty("content") ; // need to change it manually
webPageStore.put("com.example/http", page);
webPageStore.flush() ;
// Read directly from HBase
- HTable table = new HTable("WebPage");
+ HTable table = new HTable(conf,"WebPage");
Get get = new Get(Bytes.toBytes("com.example/http"));
org.apache.hadoop.hbase.client.Result result = table.get(get);
@@ -212,7 +213,7 @@ public class TestHBaseStore extends Data
@Override
public void assertPutMap() throws IOException {
- HTable table = new HTable("WebPage");
+ HTable table = new HTable(conf,"WebPage");
Get get = new Get(Bytes.toBytes("com.example/http"));
org.apache.hadoop.hbase.client.Result result = table.get(get);
Modified: gora/branches/GORA_94/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java?rev=1538247&r1=1538246&r2=1538247&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java Sat Nov 2 18:38:01 2013
@@ -20,6 +20,7 @@ package org.apache.gora.hbase.util;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
@@ -30,7 +31,9 @@ import java.util.concurrent.Future;
import org.apache.avro.util.Utf8;
import org.apache.gora.examples.generated.Employee;
import org.apache.gora.examples.generated.Metadata;
+
import static org.junit.Assert.assertEquals;
+
import org.junit.Test;
public class TestHBaseByteInterface {
@@ -42,39 +45,40 @@ public class TestHBaseByteInterface {
for (int i=0; i < 1000; i++) {
//employer
- Utf8 name = new Utf8("john");
+ CharSequence name = (CharSequence) new Utf8("john");
long dateOfBirth = System.currentTimeMillis();
int salary = 1337;
- Utf8 ssn = new Utf8(String.valueOf(RANDOM.nextLong()));
+ CharSequence ssn = (CharSequence) new Utf8(String.valueOf(RANDOM.nextLong()));
- Employee e = new Employee();
+ Employee e = Employee.newBuilder().build();
e.setName(name);
e.setDateOfBirth(dateOfBirth);
e.setSalary(salary);
e.setSsn(ssn);
- byte[] employerBytes = HBaseByteInterface.toBytes(e, Employee._SCHEMA);
- Employee e2 = (Employee) HBaseByteInterface.fromBytes(Employee._SCHEMA,
+ byte[] employerBytes = HBaseByteInterface.toBytes(e, Employee.SCHEMA$);
+ Employee e2 = (Employee) HBaseByteInterface.fromBytes(Employee.SCHEMA$,
employerBytes);
assertEquals(name, e2.getName());
- assertEquals(dateOfBirth, e2.getDateOfBirth());
- assertEquals(salary, e2.getSalary());
+ assertEquals(dateOfBirth, e2.getDateOfBirth().longValue());
+ assertEquals(salary, e2.getSalary().intValue());
assertEquals(ssn, e2.getSsn());
//metadata
- Utf8 key = new Utf8("theKey");
- Utf8 value = new Utf8("theValue " + RANDOM.nextLong());
-
- Metadata m = new Metadata();
- m.putToData(key, value);
+ CharSequence key = (CharSequence) new Utf8("theKey");
+ CharSequence value = (CharSequence) new Utf8("theValue " + RANDOM.nextLong());
+ HashMap<CharSequence, CharSequence> data = new HashMap<CharSequence, CharSequence>();
+ data.put(key, value);
+ Metadata m = Metadata.newBuilder().build();
+ m.setData(data);
- byte[] datumBytes = HBaseByteInterface.toBytes(m, Metadata._SCHEMA);
- Metadata m2 = (Metadata) HBaseByteInterface.fromBytes(Metadata._SCHEMA,
+ byte[] datumBytes = HBaseByteInterface.toBytes(m, Metadata.SCHEMA$);
+ Metadata m2 = (Metadata) HBaseByteInterface.fromBytes(Metadata.SCHEMA$,
datumBytes);
- assertEquals(value, m2.getFromData(key));
+ assertEquals(value, m2.getData().get(key));
}
}
Modified: gora/branches/GORA_94/pom.xml
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/pom.xml?rev=1538247&r1=1538246&r2=1538247&view=diff
==============================================================================
--- gora/branches/GORA_94/pom.xml (original)
+++ gora/branches/GORA_94/pom.xml Sat Nov 2 18:38:01 2013
@@ -574,8 +574,8 @@
<module>gora-compiler</module>
<module>gora-compiler-cli</module>
<module>gora-core</module>
- <!--module>gora-hbase</module>
- <module>gora-accumulo</module-->
+ <module>gora-hbase</module>
+ <!--module>gora-accumulo</module-->
<!--module>gora-cassandra</module-->
<!-- module>gora-solr</module-->
<!--module>gora-dynamodb</module-->
@@ -592,7 +592,7 @@
<!-- Hadoop Dependencies -->
<hadoop.version>1.0.1</hadoop.version>
<hadoop.test.version>1.0.1</hadoop.test.version>
- <hbase.version>0.90.4</hbase.version>
+ <hbase.version>0.94.9</hbase.version>
<avro.version>1.7.4</avro.version>
<cxf-rt-frontend-jaxrs.version>2.5.2</cxf-rt-frontend-jaxrs.version>
<!-- Amazon Dependencies -->