You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2014/06/04 18:36:30 UTC

[13/50] [abbrv] git commit: GORA-321. Merge GORA_94 into Gora trunk

GORA-321. Merge GORA_94 into Gora trunk

git-svn-id: https://svn.apache.org/repos/asf/gora/trunk@1586888 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/136fc595
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/136fc595
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/136fc595

Branch: refs/heads/master
Commit: 136fc595ad3ff7e47bc9878967d2079948b36bc1
Parents: d1c4ac4
Author: Renato Javier Marroquín Mogrovejo <rmarroquin@apache.org = rmarroquin = Renato Javier Marroquín Mogrovejo rmarroquin@apache.org@apache.org>
Authored: Sat Apr 12 19:21:53 2014 +0000
Committer: Damien Raude-Morvan <da...@dictanova.com>
Committed: Wed Apr 16 00:18:55 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  22 +-
 bin/gora                                        |   4 +-
 gora-accumulo/pom.xml                           |  39 +-
 .../gora/accumulo/encoders/BinaryEncoder.java   |   6 +
 .../gora/accumulo/query/AccumuloResult.java     |   2 +-
 .../gora/accumulo/store/AccumuloStore.java      | 564 +++++++-----
 .../test/resources/gora-accumulo-mapping.xml    |   1 +
 .../src/test/resources/gora.properties          |   2 +-
 gora-cassandra/pom.xml                          |  32 +-
 .../gora/cassandra/query/CassandraColumn.java   |  19 +-
 .../gora/cassandra/query/CassandraResult.java   |  59 +-
 .../cassandra/query/CassandraSubColumn.java     |  69 +-
 .../cassandra/query/CassandraSuperColumn.java   | 103 ++-
 .../serializers/AvroSerializerUtil.java         |  94 ++
 .../serializers/CharSequenceSerializer.java     |  64 ++
 .../serializers/GenericArraySerializer.java     | 199 -----
 .../serializers/GoraSerializerTypeInferer.java  |  65 +-
 .../cassandra/serializers/ListSerializer.java   | 193 ++++
 .../cassandra/serializers/MapSerializer.java    | 223 +++++
 .../serializers/SpecificFixedSerializer.java    |   5 -
 .../serializers/StatefulHashMapSerializer.java  | 236 -----
 .../gora/cassandra/serializers/TypeUtils.java   |  31 +-
 .../cassandra/serializers/Utf8Serializer.java   |  62 --
 .../gora/cassandra/store/CassandraClient.java   | 325 ++++---
 .../gora/cassandra/store/CassandraMapping.java  |  38 +-
 .../store/CassandraMappingManager.java          |   4 +-
 .../gora/cassandra/store/CassandraStore.java    | 456 ++++++----
 .../gora/cassandra/store/HectorUtils.java       |  17 +-
 gora-cassandra/src/test/conf/cassandra.yaml     |  16 +-
 .../src/test/conf/gora-cassandra-mapping.xml    |   3 +-
 gora-cassandra/src/test/conf/gora.properties    |  20 +-
 .../src/test/conf/log4j-server.properties       |   3 +
 .../gora/cassandra/GoraCassandraTestDriver.java |  42 +-
 .../cassandra/store/TestCassandraStore.java     |  42 +-
 gora-compiler-cli/pom.xml                       |  68 ++
 .../gora/compiler/cli/GoraCompilerCLI.java      |  69 ++
 gora-compiler/pom.xml                           | 104 +++
 .../org/apache/gora/compiler/GoraCompiler.java  | 269 ++++++
 .../apache/gora/compiler/templates/record.vm    | 349 ++++++++
 gora-core/pom.xml                               |  18 +-
 gora-core/src/examples/avro/employee.json       |  29 +-
 .../src/examples/avro/immutable_fields.json     |  18 +
 gora-core/src/examples/avro/tokendatum.json     |   2 +-
 gora-core/src/examples/avro/webpage.json        |  21 +-
 .../gora/examples/WebPageDataCreator.java       |  68 +-
 .../gora/examples/generated/Employee.java       | 741 ++++++++++++----
 .../examples/generated/ImmutableFields.java     | 335 +++++++
 .../gora/examples/generated/Metadata.java       | 407 ++++++---
 .../gora/examples/generated/TokenDatum.java     | 304 ++++---
 .../org/apache/gora/examples/generated/V2.java  | 251 ++++++
 .../apache/gora/examples/generated/WebPage.java | 748 ++++++++++++----
 .../apache/gora/avro/PersistentDatumReader.java | 260 ------
 .../apache/gora/avro/PersistentDatumWriter.java | 123 ---
 .../org/apache/gora/avro/store/AvroStore.java   |  15 +-
 .../gora/avro/store/DataFileAvroStore.java      |   1 -
 .../apache/gora/filter/MapFieldValueFilter.java |   3 +-
 .../gora/filter/SingleFieldValueFilter.java     |   2 +-
 .../gora/mapreduce/FakeResolvingDecoder.java    | 170 ----
 .../gora/mapreduce/GoraMapReduceUtils.java      |  12 +-
 .../gora/mapreduce/PersistentDeserializer.java  |  34 +-
 .../PersistentNonReusingSerialization.java      |  43 -
 .../gora/mapreduce/PersistentSerialization.java |   8 +-
 .../gora/mapreduce/PersistentSerializer.java    |  42 +-
 .../org/apache/gora/memory/store/MemStore.java  |  47 +-
 .../org/apache/gora/persistency/Dirtyable.java  |  41 +
 .../gora/persistency/ListGenericArray.java      | 109 ---
 .../org/apache/gora/persistency/Persistent.java | 242 ++---
 .../java/org/apache/gora/persistency/State.java |  37 -
 .../apache/gora/persistency/StateManager.java   | 110 ---
 .../gora/persistency/StatefulHashMap.java       | 132 ---
 .../apache/gora/persistency/StatefulMap.java    |  43 -
 .../org/apache/gora/persistency/Tombstone.java  |  16 +
 .../org/apache/gora/persistency/Tombstones.java | 242 +++++
 .../gora/persistency/impl/BeanFactoryImpl.java  |  25 +-
 .../impl/DirtyCollectionWrapper.java            | 145 +++
 .../apache/gora/persistency/impl/DirtyFlag.java |  56 ++
 .../persistency/impl/DirtyIteratorWrapper.java  |  35 +
 .../persistency/impl/DirtyListIterator.java     |  67 ++
 .../gora/persistency/impl/DirtyListWrapper.java |  97 ++
 .../gora/persistency/impl/DirtyMapWrapper.java  | 194 ++++
 .../gora/persistency/impl/DirtySetWrapper.java  |  15 +
 .../gora/persistency/impl/PersistentBase.java   | 405 +++------
 .../gora/persistency/impl/StateManagerImpl.java | 104 ---
 .../persistency/ws/impl/BeanFactoryWSImpl.java  |  12 +-
 .../persistency/ws/impl/PersistentWSBase.java   | 169 +---
 .../persistency/ws/impl/StateManagerWSImpl.java | 149 ----
 .../org/apache/gora/store/DataStoreFactory.java |  31 +-
 .../apache/gora/store/impl/DataStoreBase.java   |  51 +-
 .../java/org/apache/gora/util/AvroUtils.java    |  86 +-
 .../java/org/apache/gora/util/ByteUtils.java    |  34 +-
 .../main/java/org/apache/gora/util/IOUtils.java |  78 +-
 .../org/apache/gora/util/ReflectionUtils.java   |  10 +
 .../src/test/conf/hadoop-metrics2.properties    |   8 +
 .../java/org/apache/gora/GoraTestDriver.java    |  13 +-
 .../gora/avro/TestPersistentDatumReader.java    | 104 ---
 .../gora/examples/TestWebPageDataCreator.java   |  19 +
 .../gora/filter/TestMapFieldValueFilter.java    |  15 +-
 .../gora/filter/TestSingleFieldValueFilter.java |   7 +-
 .../mapreduce/DataStoreMapReduceTestBase.java   |   3 +-
 .../gora/mapreduce/MapReduceTestUtils.java      |  23 +-
 .../gora/mapreduce/TestGoraInputFormat.java     |  24 +-
 .../mapreduce/TestPersistentSerialization.java  |  66 +-
 .../gora/mock/persistency/MockPersistent.java   |  22 +-
 .../gora/persistency/TestListGenericArray.java  |  57 --
 .../persistency/impl/TestPersistentBase.java    |  93 +-
 .../persistency/impl/TestStateManagerImpl.java  | 115 ---
 .../apache/gora/store/DataStoreTestBase.java    |  19 +-
 .../apache/gora/store/DataStoreTestUtil.java    | 473 ++++++++--
 .../apache/gora/store/TestDataStoreFactory.java |   6 +-
 .../java/org/apache/gora/util/TestIOUtils.java  |   9 +-
 .../gora/dynamodb/store/DynamoDBStore.java      |   9 +-
 gora-hbase/pom.xml                              |  35 +-
 .../apache/gora/hbase/query/HBaseGetResult.java |   1 -
 .../apache/gora/hbase/query/HBaseResult.java    |   1 -
 .../org/apache/gora/hbase/store/HBaseStore.java | 413 +++++----
 .../gora/hbase/store/HBaseTableConnection.java  |  92 +-
 .../gora/hbase/util/HBaseByteInterface.java     | 157 +---
 gora-hbase/src/test/conf/gora-hbase-mapping.xml |   1 +
 gora-hbase/src/test/conf/hbase-site.xml         |   8 +-
 .../apache/gora/hbase/GoraHBaseTestDriver.java  |  36 +-
 .../mapreduce/TestHBaseStoreCountQuery.java     |   1 -
 .../apache/gora/hbase/store/TestHBaseStore.java |  37 +-
 .../gora/hbase/util/HBaseClusterSingleton.java  |   3 +
 .../gora/hbase/util/TestHBaseByteInterface.java |  37 +-
 gora-solr/pom.xml                               |  31 +-
 .../org/apache/gora/solr/store/SolrStore.java   | 610 ++++++++-----
 gora-solr/src/test/conf/gora-solr-mapping.xml   |   3 +
 gora-solr/src/test/conf/gora.properties         |   1 +
 .../src/test/conf/solr/Employee/conf/schema.xml |   5 +-
 .../src/test/conf/solr/WebPage/conf/schema.xml  |   3 +-
 .../apache/gora/solr/store/TestSolrStore.java   |  46 +-
 gora-tutorial/pom.xml                           |   2 +-
 gora-tutorial/src/main/avro/metricdatum.json    |   8 +-
 gora-tutorial/src/main/avro/pageview.json       |  18 +-
 .../apache/gora/tutorial/log/LogAnalytics.java  |   2 +-
 .../tutorial/log/generated/MetricDatum.java     | 442 ++++++++--
 .../gora/tutorial/log/generated/Pageview.java   | 874 ++++++++++++++++---
 pom.xml                                         | 111 ++-
 138 files changed, 9036 insertions(+), 5383 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4da6704..25caab0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,13 +4,29 @@
 
 Gora Change Log
 
+* GORA-106 Migrate Gora website documentation to Apache CMS (lewismc)
+
+* GORA-296 Improve 'Keyclass and nameclass match' logging in HBaseStore (rmarroquin via lewismc)
+
+* GORA-246 Upgrade to Avro 1.7.X in gora-hbase (Alparslan Avcı, rmarroquin, lewismc via lewismc)
+
+* GORA-154 delete() method is not implemented at CassandraStore, and always returns false or 0 (rmarroquin via Kazuomi Kashii)
+
+* GORA-204 Don't store empty arrays in CassandraClient#addGenericArray(), addStatefulHashMap() and CassandraStore#addOrUpdateField(rmarroquin via lewismc)
+
+* GORA-303 Upgrade to Avro 1.7.X in gora-solr (Talat UYARER)
+
+* GORA-253 Add Facebook, Linkedin, Google+, Twitter, etc plugins to website (lewismc)
+
+* GORA-244 Upgrade to Avro 1.7.X in gora-accumulo (Akber Choudhry via lewismc)
+
+* GORA-306 Ssn field is not nullable in Employee's Avro Schema (Talat UYARER via lewismc)
+
 * GORA-171 Implement Daily Rolling File Appender for localised Gora logging (lewismc)
 
 * GORA-119 implement a filter enabled scan in gora (ferdy, kturner, enis, Tien Nguyen Manh via lewismc)
 
-* GORA-290 StatefulHashMap removes the entry when put with same value (Alparslan Avci via hsaputra)
-
-* GORA-231 Provide better error handling in AccumuloStore.readMapping when file does not exist (Apostolos Giannakidis)
+* GORA-231 Provide better error handling in AccumuloStore.readMapping when file does not exist. (apgiannakidis via lewismc)
 
 * GORA-283 Specify field name for types not being considered in gora-cassandra (lewismc)
 

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/bin/gora
----------------------------------------------------------------------
diff --git a/bin/gora b/bin/gora
index e212978..d660099 100755
--- a/bin/gora
+++ b/bin/gora
@@ -110,9 +110,9 @@ fi
 
 # figure out which class to run
 if [ "$COMMAND" = "goracompiler" ] ; then
-  MODULE=gora-core
+  MODULE=gora-compiler-cli
   CLASSPATH=$CLASSPATH:$GORA_HOME/$MODULE/target/classes/
-  CLASS=org.apache.gora.compiler.GoraCompiler
+  CLASS=org.apache.gora.compiler.cli.GoraCompilerCLI
 elif [ "$COMMAND" = "specificcompiler" ] ; then
   MODULE=gora-core
   CLASSPATH=$CLASSPATH:$GORA_HOME/$MODULE/target/classes/

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-accumulo/pom.xml
----------------------------------------------------------------------
diff --git a/gora-accumulo/pom.xml b/gora-accumulo/pom.xml
index a7f9f34..ded3261 100644
--- a/gora-accumulo/pom.xml
+++ b/gora-accumulo/pom.xml
@@ -1,22 +1,17 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <!--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
+		contributor license agreements. See the NOTICE file distributed with this 
+		work for additional information regarding copyright ownership. The ASF licenses 
+		this file to You under the Apache License, Version 2.0 (the "License"); you 
+		may not use this file except in compliance with the License. You may obtain 
+		a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
+		required by applicable law or agreed to in writing, software distributed 
+		under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+		OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+		the specific language governing permissions and limitations under the License. -->
 
-        http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-    -->
-    
 	<modelVersion>4.0.0</modelVersion>
 
 	<parent>
@@ -114,12 +109,12 @@
         <dependency>
            <groupId>org.apache.accumulo</groupId>
            <artifactId>accumulo-core</artifactId>
-           <version>1.4.0</version>
+           <version>1.5.1</version>
         </dependency>
 
         <!-- Hadoop Dependencies -->
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
+            <groupId>org.apache.avro</groupId>
             <artifactId>avro</artifactId>
         </dependency>
 
@@ -145,6 +140,12 @@
             <artifactId>hadoop-test</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+           
     </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/BinaryEncoder.java
----------------------------------------------------------------------
diff --git a/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/BinaryEncoder.java b/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/BinaryEncoder.java
index d3fffec..7368993 100644
--- a/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/BinaryEncoder.java
+++ b/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/BinaryEncoder.java
@@ -33,6 +33,7 @@ public class BinaryEncoder implements Encoder {
   
   public byte[] encodeShort(short s, byte ret[]) {
     try {
+      @SuppressWarnings("resource")
       DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret));
       dos.writeShort(s);
       return ret;
@@ -57,6 +58,7 @@ public class BinaryEncoder implements Encoder {
   
   public byte[] encodeInt(int i, byte ret[]) {
     try {
+      @SuppressWarnings("resource")
       DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret));
       dos.writeInt(i);
       return ret;
@@ -81,6 +83,7 @@ public class BinaryEncoder implements Encoder {
   
   public byte[] encodeLong(long l, byte ret[]) {
     try {
+      @SuppressWarnings("resource")
       DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret));
       dos.writeLong(l);
       return ret;
@@ -106,6 +109,7 @@ public class BinaryEncoder implements Encoder {
   public byte[] encodeDouble(double d, byte[] ret) {
     try {
       long l = Double.doubleToRawLongBits(d);
+      @SuppressWarnings("resource")
       DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret));
       dos.writeLong(l);
       return ret;
@@ -131,6 +135,7 @@ public class BinaryEncoder implements Encoder {
   public byte[] encodeFloat(float f, byte[] ret) {
     try {
       int i = Float.floatToRawIntBits(f);
+      @SuppressWarnings("resource")
       DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret));
       dos.writeInt(i);
       return ret;
@@ -177,6 +182,7 @@ public class BinaryEncoder implements Encoder {
   
   public byte[] encodeBoolean(boolean b, byte[] ret) {
     try {
+      @SuppressWarnings("resource")
       DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret));
       dos.writeBoolean(b);
       return ret;

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java
----------------------------------------------------------------------
diff --git a/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java b/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java
index 0e2f310..f525f9f 100644
--- a/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java
+++ b/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java
@@ -75,7 +75,7 @@ public class AccumuloResult<K,T extends PersistentBase> extends ResultBase<K,T>
     
     Iterator<Entry<Key,Value>> nextRow = iterator.next();
     ByteSequence row = getDataStore().populate(nextRow, persistent);
-    key = (K) ((AccumuloStore) dataStore).fromBytes(getKeyClass(), row.toArray());
+    key = (K) ((AccumuloStore<K, T>) dataStore).fromBytes(getKeyClass(), row.toArray());
     
     return true;
   }

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
----------------------------------------------------------------------
diff --git a/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java b/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
index 32dd4db..3c1911a 100644
--- a/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
+++ b/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
@@ -38,6 +39,7 @@ import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.IsolatedScanner;
 import org.apache.accumulo.core.client.IteratorSetting;
@@ -54,6 +56,8 @@ import org.apache.accumulo.core.client.impl.TabletLocator;
 import org.apache.accumulo.core.client.mock.MockConnector;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.mock.MockTabletLocator;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
@@ -64,27 +68,27 @@ import org.apache.accumulo.core.iterators.SortedKeyIterator;
 import org.apache.accumulo.core.iterators.user.TimestampFilter;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.security.thrift.AuthInfo;
+import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
-import org.apache.avro.generic.GenericArray;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
+import org.apache.gora.accumulo.encoders.BinaryEncoder;
 import org.apache.gora.accumulo.encoders.Encoder;
 import org.apache.gora.accumulo.query.AccumuloQuery;
 import org.apache.gora.accumulo.query.AccumuloResult;
-import org.apache.gora.persistency.ListGenericArray;
-import org.apache.gora.persistency.State;
-import org.apache.gora.persistency.StateManager;
-import org.apache.gora.persistency.StatefulHashMap;
-import org.apache.gora.persistency.StatefulMap;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
+import org.apache.gora.persistency.impl.DirtyMapWrapper;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
@@ -105,7 +109,7 @@ import org.w3c.dom.NodeList;
  * 
  */
 public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T> {
-  
+
   protected static final String MOCK_PROPERTY = "accumulo.mock";
   protected static final String INSTANCE_NAME_PROPERTY = "accumulo.instance";
   protected static final String ZOOKEEPERS_NAME_PROPERTY = "accumulo.zookeepers";
@@ -116,36 +120,71 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
   private Connector conn;
   private BatchWriter batchWriter;
   private AccumuloMapping mapping;
-  private AuthInfo authInfo;
+  private TCredentials credentials;
   private Encoder encoder;
-  
+
   public static final Logger LOG = LoggerFactory.getLogger(AccumuloStore.class);
-  
-  public Object fromBytes(Schema schema, byte data[]) {
-    return fromBytes(encoder, schema, data);
+
+  public Object fromBytes(Schema schema, byte data[]) throws GoraException {
+    Schema fromSchema = null;
+    if (schema.getType() == Type.UNION) {
+      try {
+        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
+        int unionIndex = decoder.readIndex();
+        List<Schema> possibleTypes = schema.getTypes();
+        fromSchema = possibleTypes.get(unionIndex);
+        Schema effectiveSchema = possibleTypes.get(unionIndex);
+        if (effectiveSchema.getType() == Type.NULL) {
+          decoder.readNull();
+          return null;
+        } else {
+          data = decoder.readBytes(null).array();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        throw new GoraException("Error decoding union type: ", e);
+      }
+    } else {
+      fromSchema = schema;
+    }
+    return fromBytes(encoder, fromSchema, data);
   }
 
   public static Object fromBytes(Encoder encoder, Schema schema, byte data[]) {
     switch (schema.getType()) {
-      case BOOLEAN:
-        return encoder.decodeBoolean(data);
-      case DOUBLE:
-        return encoder.decodeDouble(data);
-      case FLOAT:
-        return encoder.decodeFloat(data);
-      case INT:
-        return encoder.decodeInt(data);
-      case LONG:
-        return encoder.decodeLong(data);
-      case STRING:
-        return new Utf8(data);
-      case BYTES:
-        return ByteBuffer.wrap(data);
-      case ENUM:
-        return AvroUtils.getEnumValue(schema, encoder.decodeInt(data));
+    case BOOLEAN:
+      return encoder.decodeBoolean(data);
+    case DOUBLE:
+      return encoder.decodeDouble(data);
+    case FLOAT:
+      return encoder.decodeFloat(data);
+    case INT:
+      return encoder.decodeInt(data);
+    case LONG:
+      return encoder.decodeLong(data);
+    case STRING:
+      return new Utf8(data);
+    case BYTES:
+      return ByteBuffer.wrap(data);
+    case ENUM:
+      return AvroUtils.getEnumValue(schema, encoder.decodeInt(data));
+    case ARRAY:
+      break;
+    case FIXED:
+      break;
+    case MAP:
+      break;
+    case NULL:
+      break;
+    case RECORD:
+      break;
+    case UNION:
+      break;
+    default:
+      break;
     }
     throw new IllegalArgumentException("Unknown type " + schema.getType());
-    
+
   }
 
   public K fromBytes(Class<K> clazz, byte[] val) {
@@ -174,7 +213,7 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
       } else if (clazz.equals(Utf8.class)) {
         return (K) new Utf8(val);
       }
-      
+
       throw new IllegalArgumentException("Unknown type " + clazz.getName());
     } catch (IOException ioe) {
       throw new RuntimeException(ioe);
@@ -190,17 +229,67 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
     return b;
   }
 
+  public byte[] toBytes(Schema toSchema, Object o) {
+    if (toSchema != null && toSchema.getType() == Type.UNION) {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      org.apache.avro.io.BinaryEncoder avroEncoder = EncoderFactory.get().binaryEncoder(baos, null);
+      int unionIndex = 0;
+      try {
+        if (o == null) {
+          unionIndex = firstNullSchemaTypeIndex(toSchema);
+          avroEncoder.writeIndex(unionIndex);
+          avroEncoder.writeNull();
+        } else {
+          unionIndex = firstNotNullSchemaTypeIndex(toSchema);
+          avroEncoder.writeIndex(unionIndex);
+          avroEncoder.writeBytes(toBytes(o));
+        }
+        avroEncoder.flush();
+        return baos.toByteArray();
+      } catch (IOException e) {
+        e.printStackTrace();
+        return toBytes(o);
+      }
+    } else {     
+      return toBytes(o);
+    }
+  }
+
+  private int firstNullSchemaTypeIndex(Schema toSchema) {
+    List<Schema> possibleTypes = toSchema.getTypes();
+    int unionIndex = 0;
+    for (int i = 0; i < possibleTypes.size(); i++ ) {
+      Type pType = possibleTypes.get(i).getType();
+      if (pType == Type.NULL) { // FIXME HUGE kludge to pass tests
+        unionIndex = i; break;
+      }
+    }
+    return unionIndex;
+  }
+
+  private int firstNotNullSchemaTypeIndex(Schema toSchema) {
+    List<Schema> possibleTypes = toSchema.getTypes();
+    int unionIndex = 0;
+    for (int i = 0; i < possibleTypes.size(); i++ ) {
+      Type pType = possibleTypes.get(i).getType();
+      if (pType != Type.NULL) { // FIXME HUGE kludge to pass tests
+        unionIndex = i; break;
+      }
+    }
+    return unionIndex;
+  }
+
   public byte[] toBytes(Object o) {
     return toBytes(encoder, o);
   }
-  
+
   public static byte[] toBytes(Encoder encoder, Object o) {
-    
+
     try {
       if (o instanceof String) {
         return ((String) o).getBytes("UTF-8");
       } else if (o instanceof Utf8) {
-        return copyIfNeeded(((Utf8) o).getBytes(), 0, ((Utf8) o).getLength());
+        return copyIfNeeded(((Utf8) o).getBytes(), 0, ((Utf8) o).getByteLength());
       } else if (o instanceof ByteBuffer) {
         return copyIfNeeded(((ByteBuffer) o).array(), ((ByteBuffer) o).arrayOffset() + ((ByteBuffer) o).position(), ((ByteBuffer) o).remaining());
       } else if (o instanceof Long) {
@@ -218,19 +307,23 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
       } else if (o instanceof Double) {
         return encoder.encodeDouble((Double) o);
       } else if (o instanceof Enum) {
-        return encoder.encodeInt(((Enum) o).ordinal());
+        return encoder.encodeInt(((Enum<?>) o).ordinal());
       }
     } catch (IOException ioe) {
       throw new RuntimeException(ioe);
     }
-    
+
     throw new IllegalArgumentException("Uknown type " + o.getClass().getName());
   }
 
   private BatchWriter getBatchWriter() throws IOException {
     if (batchWriter == null)
       try {
-        batchWriter = conn.createBatchWriter(mapping.tableName, 10000000, 60000l, 4);
+        BatchWriterConfig batchWriterConfig = new BatchWriterConfig();
+        batchWriterConfig.setMaxMemory(10000000);
+        batchWriterConfig.setMaxLatency(60000l, TimeUnit.MILLISECONDS);
+        batchWriterConfig.setMaxWriteThreads(4);
+        batchWriter = conn.createBatchWriter(mapping.tableName, batchWriterConfig);
       } catch (TableNotFoundException e) {
         throw new IOException(e);
       }
@@ -241,16 +334,16 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
   public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) {
     try{
       super.initialize(keyClass, persistentClass, properties);
-  
+
       String mock = DataStoreFactory.findProperty(properties, this, MOCK_PROPERTY, null);
       String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
       String user = DataStoreFactory.findProperty(properties, this, USERNAME_PROPERTY, null);
       String password = DataStoreFactory.findProperty(properties, this, PASSWORD_PROPERTY, null);
-      
+
       mapping = readMapping(mappingFile);
-  
+
       if (mapping.encoder == null || mapping.encoder.equals("")) {
-        encoder = new org.apache.gora.accumulo.encoders.BinaryEncoder();
+        encoder = new BinaryEncoder();
       } else {
         try {
           encoder = (Encoder) getClass().getClassLoader().loadClass(mapping.encoder).newInstance();
@@ -262,17 +355,23 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
           throw new IOException(e);
         }
       }
-  
+
       try {
+        AuthenticationToken token =  new PasswordToken(password);
         if (mock == null || !mock.equals("true")) {
           String instance = DataStoreFactory.findProperty(properties, this, INSTANCE_NAME_PROPERTY, null);
           String zookeepers = DataStoreFactory.findProperty(properties, this, ZOOKEEPERS_NAME_PROPERTY, null);
-          conn = new ZooKeeperInstance(instance, zookeepers).getConnector(user, password);
-          authInfo = new AuthInfo(user, ByteBuffer.wrap(password.getBytes()), conn.getInstance().getInstanceID());
+          credentials = new TCredentials(user, 
+              "org.apache.accumulo.core.client.security.tokens.PasswordToken", 
+              ByteBuffer.wrap(password.getBytes()), instance);
+          conn = new ZooKeeperInstance(instance, zookeepers).getConnector(user, token);
         } else {
-          conn = new MockInstance().getConnector(user, password);
+          conn = new MockInstance().getConnector(user, new PasswordToken(password));
+          credentials = new TCredentials(user, 
+              "org.apache.accumulo.core.client.security.tokens.PasswordToken", 
+              ByteBuffer.wrap(password.getBytes()), conn.getInstance().getInstanceID());
         }
-  
+
         if (autoCreateSchema)
           createSchema();
       } catch (AccumuloException e) {
@@ -284,27 +383,27 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
       LOG.error(e.getMessage(), e);
     }
   }
-  
+
   protected AccumuloMapping readMapping(String filename) throws IOException {
     try {
-      
+
       AccumuloMapping mapping = new AccumuloMapping();
 
       DocumentBuilder db = DocumentBuilderFactory.newInstance().newDocumentBuilder();
       Document dom = db.parse(getClass().getClassLoader().getResourceAsStream(filename));
-      
+
       Element root = dom.getDocumentElement();
-      
+
       NodeList nl = root.getElementsByTagName("class");
       for (int i = 0; i < nl.getLength(); i++) {
-        
+
         Element classElement = (Element) nl.item(i);
         if (classElement.getAttribute("keyClass").equals(keyClass.getCanonicalName())
             && classElement.getAttribute("name").equals(persistentClass.getCanonicalName())) {
 
           mapping.tableName = getSchemaName(classElement.getAttribute("table"), persistentClass);
           mapping.encoder = classElement.getAttribute("encoder");
-          
+
           NodeList fields = classElement.getElementsByTagName("field");
           for (int j = 0; j < fields.getLength(); j++) {
             Element fieldElement = (Element) fields.item(j);
@@ -324,9 +423,9 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
       }
 
       if (mapping.tableName == null) {
-        throw new GoraException("Please define the gora to accumulo mapping in " + filename + " for " + persistentClass.getCanonicalName());
+        throw new GoraException("Please define the accumulo 'table' name mapping in " + filename + " for " + persistentClass.getCanonicalName());
       }
-      
+
       nl = root.getElementsByTagName("table");
       for (int i = 0; i < nl.getLength(); i++) {
         Element tableElement = (Element) nl.item(i);
@@ -347,12 +446,12 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
     }
 
   }
-  
+
   @Override
   public String getSchemaName() {
     return mapping.tableName;
   }
-  
+
   @Override
   public void createSchema() {
     try {
@@ -394,20 +493,30 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
 
   public ByteSequence populate(Iterator<Entry<Key,Value>> iter, T persistent) throws IOException {
     ByteSequence row = null;
-    
-    Map currentMap = null;
-    ArrayList currentArray = null;
+
+    Map<Utf8, Object> currentMap = null;
+    List currentArray = null;
     Text currentFam = null;
     int currentPos = 0;
     Schema currentSchema = null;
     Field currentField = null;
 
+    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(new byte[0], null);
+
     while (iter.hasNext()) {
       Entry<Key,Value> entry = iter.next();
-      
+
+      if (row == null) {
+        row = entry.getKey().getRowData();
+      }
+      byte[] val = entry.getValue().get();
+
+      Field field = fieldMap.get(getFieldName(entry));
+
       if (currentMap != null) {
         if (currentFam.equals(entry.getKey().getColumnFamily())) {
-          currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), fromBytes(currentSchema, entry.getValue().get()));
+          currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), 
+              fromBytes(currentSchema, entry.getValue().get()));
           continue;
         } else {
           persistent.put(currentPos, currentMap);
@@ -418,57 +527,69 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
           currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
           continue;
         } else {
-          persistent.put(currentPos, new ListGenericArray<T>(currentField.schema(), currentArray));
+          persistent.put(currentPos, new GenericData.Array<T>(currentField.schema(), currentArray));
           currentArray = null;
         }
       }
 
-      if (row == null)
-        row = entry.getKey().getRowData();
-      
-      String fieldName = mapping.columnMap.get(new Pair<Text,Text>(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier()));
-      if (fieldName == null)
-        fieldName = mapping.columnMap.get(new Pair<Text,Text>(entry.getKey().getColumnFamily(), null));
+      switch (field.schema().getType()) {
+      case MAP:  // first entry only. Next are handled above on the next loop
+        currentMap = new DirtyMapWrapper<Utf8, Object>(new HashMap<Utf8, Object>());
+        currentPos = field.pos();
+        currentFam = entry.getKey().getColumnFamily();
+        currentSchema = field.schema().getValueType();
 
-      Field field = fieldMap.get(fieldName);
+        currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), 
+            fromBytes(currentSchema, entry.getValue().get()));
+        break;
+      case ARRAY:
+        currentArray = new DirtyListWrapper<Object>(new ArrayList<Object>());
+        currentPos = field.pos();
+        currentFam = entry.getKey().getColumnFamily();
+        currentSchema = field.schema().getElementType();
+        currentField = field;
 
-      switch (field.schema().getType()) {
-        case MAP:
-          currentMap = new StatefulHashMap();
-          currentPos = field.pos();
-          currentFam = entry.getKey().getColumnFamily();
-          currentSchema = field.schema().getValueType();
-          
-          currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), fromBytes(currentSchema, entry.getValue().get()));
+        currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
 
-          break;
-        case ARRAY:
-          currentArray = new ArrayList();
+        break;
+      case UNION:// default value of null acts like union with null
+        Schema effectiveSchema = field.schema().getTypes()
+        .get(firstNotNullSchemaTypeIndex(field.schema()));
+        // map and array were coded without union index so need to be read the same way
+        if (effectiveSchema.getType() == Type.ARRAY) {
+          currentArray = new DirtyListWrapper<Object>(new ArrayList<Object>());
           currentPos = field.pos();
           currentFam = entry.getKey().getColumnFamily();
           currentSchema = field.schema().getElementType();
           currentField = field;
-          
-          currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
 
+          currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
           break;
-        case RECORD:
-        case UNION:
-          SpecificDatumReader reader = new SpecificDatumReader(field.schema());
-          byte[] val = entry.getValue().get();
-          // TODO reuse decoder
-          BinaryDecoder decoder = DecoderFactory.defaultFactory().createBinaryDecoder(val, null);
-          persistent.put(field.pos(), reader.read(null, decoder));
+        }
+        else if (effectiveSchema.getType() == Type.MAP) {
+          currentMap = new DirtyMapWrapper<Utf8, Object>(new HashMap<Utf8, Object>());
+          currentPos = field.pos();
+          currentFam = entry.getKey().getColumnFamily();
+          currentSchema = effectiveSchema.getValueType();
+
+          currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), 
+              fromBytes(currentSchema, entry.getValue().get()));
           break;
-        default:
-          persistent.put(field.pos(), fromBytes(field.schema(), entry.getValue().get()));
+        }
+        // continue like a regular top-level union
+      case RECORD:
+        SpecificDatumReader<?> reader = new SpecificDatumReader<Schema>(field.schema());
+        persistent.put(field.pos(), reader.read(null, DecoderFactory.get().binaryDecoder(val, decoder)));
+        break;
+      default:
+        persistent.put(field.pos(), fromBytes(field.schema(), entry.getValue().get()));
       }
     }
-    
+
     if (currentMap != null) {
       persistent.put(currentPos, currentMap);
     } else if (currentArray != null) {
-      persistent.put(currentPos, new ListGenericArray<T>(currentField.schema(), currentArray));
+      persistent.put(currentPos, new GenericData.Array<T>(currentField.schema(), currentArray));
     }
 
     persistent.clearDirty();
@@ -476,14 +597,32 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
     return row;
   }
 
+  /**
+   * Retrieve field name from entry.
+   * @param entry The Key-Value entry
+   * @return String The field name
+   */
+  private String getFieldName(Entry<Key, Value> entry) {
+    String fieldName = mapping.columnMap.get(new Pair<Text,Text>(entry.getKey().getColumnFamily(), 
+        entry.getKey().getColumnQualifier()));
+    if (fieldName == null) {
+      fieldName = mapping.columnMap.get(new Pair<Text,Text>(entry.getKey().getColumnFamily(), null));
+    }
+    return fieldName;
+  }
+
   private void setFetchColumns(Scanner scanner, String fields[]) {
     fields = getFieldsToQuery(fields);
     for (String field : fields) {
       Pair<Text,Text> col = mapping.fieldMap.get(field);
-      if (col.getSecond() == null) {
-        scanner.fetchColumnFamily(col.getFirst());
+      if (col != null) {
+        if (col.getSecond() == null) {
+          scanner.fetchColumnFamily(col.getFirst());
+        } else {
+          scanner.fetchColumn(col.getFirst(), col.getSecond());
+        }
       } else {
-        scanner.fetchColumn(col.getFirst(), col.getSecond());
+        LOG.error("Mapping not found for field: " + field);
       }
     }
   }
@@ -494,10 +633,10 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
       // TODO make isolated scanner optional?
       Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Constants.NO_AUTHS));
       Range rowRange = new Range(new Text(toBytes(key)));
-      
+
       scanner.setRange(rowRange);
       setFetchColumns(scanner, fields);
-      
+
       T persistent = newPersistent();
       ByteSequence row = populate(scanner.iterator(), persistent);
       if (row == null)
@@ -511,90 +650,67 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
       return null;
     }
   }
-  
+
   @Override
   public void put(K key, T val) {
 
     try{
       Mutation m = new Mutation(new Text(toBytes(key)));
-      
+
       Schema schema = val.getSchema();
-      StateManager stateManager = val.getStateManager();
-      
-      Iterator<Field> iter = schema.getFields().iterator();
-      
+      List<Field> fields = schema.getFields();
       int count = 0;
-      for (int i = 0; iter.hasNext(); i++) {
-        Field field = iter.next();
-        if (!stateManager.isDirty(val, i)) {
+
+      for (int i = 1; i < fields.size(); i++) {
+        if (!val.isDirty(i)) {
           continue;
         }
-        
-        Object o = val.get(i);
+        Field field = fields.get(i);
+
+        Object o = val.get(field.pos());       
+
         Pair<Text,Text> col = mapping.fieldMap.get(field.name());
 
         if (col == null) {
           throw new GoraException("Please define the gora to accumulo mapping for field " + field.name());
         }
 
-  
         switch (field.schema().getType()) {
-          case MAP:
-            if (o instanceof StatefulMap) {
-              StatefulMap map = (StatefulMap) o;
-              Set<?> es = map.states().entrySet();
-              for (Object entry : es) {
-                Object mapKey = ((Entry) entry).getKey();
-                State state = (State) ((Entry) entry).getValue();
-  
-                switch (state) {
-                  case NEW:
-                  case DIRTY:
-                    m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(map.get(mapKey))));
-                    count++;
-                    break;
-                  case DELETED:
-                    m.putDelete(col.getFirst(), new Text(toBytes(mapKey)));
-                    count++;
-                    break;
-                }
-                
-              }
-            } else {
-              Map map = (Map) o;
-              Set<?> es = map.entrySet();
-              for (Object entry : es) {
-                Object mapKey = ((Entry) entry).getKey();
-                Object mapVal = ((Entry) entry).getValue();
-                m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(mapVal)));
-                count++;
-              }
-            }
-            break;
-          case ARRAY:
-            GenericArray array = (GenericArray) o;
-            int j = 0;
-            for (Object item : array) {
-              m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item)));
-              count++;
-            }
+        case MAP:
+          count = putMap(m, count, field.schema().getValueType(), o, col);
+          break;
+        case ARRAY:
+          count = putArray(m, count, o, col);
+          break;
+        case UNION: // default value of null acts like union with null
+          Schema effectiveSchema = field.schema().getTypes()
+          .get(firstNotNullSchemaTypeIndex(field.schema()));
+          // map and array need to compute qualifier
+          if (effectiveSchema.getType() == Type.ARRAY) {
+            count = putArray(m, count, o, col);
             break;
-          case RECORD:
-          case UNION:
-            SpecificDatumWriter writer = new SpecificDatumWriter(field.schema());
-            ByteArrayOutputStream os = new ByteArrayOutputStream();
-            BinaryEncoder encoder = new BinaryEncoder(os);
-            writer.write(o, encoder);
-            encoder.flush();
-            m.put(col.getFirst(), col.getSecond(), new Value(os.toByteArray()));
+          }
+          else if (effectiveSchema.getType() == Type.MAP) {
+            count = putMap(m, count, effectiveSchema.getValueType(), o, col);
             break;
-          default:
-            m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o)));
-            count++;
+          }
+          // continue like a regular top-level union
+        case RECORD:
+          SpecificDatumWriter<Object> writer = new SpecificDatumWriter<Object>(field.schema());
+          ByteArrayOutputStream os = new ByteArrayOutputStream();
+          org.apache.avro.io.BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null);
+          writer.write(o, encoder);
+          encoder.flush();
+          m.put(col.getFirst(), col.getSecond(), new Value(os.toByteArray()));
+          count++;
+          break;
+        default:
+          m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o)));
+          count++;
         }
-  
+
       }
-      
+
       if (count > 0)
         try {
           getBatchWriter().addMutation(m);
@@ -605,7 +721,58 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
       LOG.error(e.getMessage(), e);
     }
   }
-  
+
+  private int putMap(Mutation m, int count, Schema valueType, Object o, Pair<Text, Text> col) throws GoraException {
+
+    // First of all we delete map field on accumulo store
+    Text rowKey = new Text(m.getRow());
+    Query<K, T> query = newQuery();
+    query.setFields(col.getFirst().toString());
+    query.setStartKey((K)rowKey.toString());
+    query.setEndKey((K)rowKey.toString());
+    deleteByQuery(query);
+    flush();
+    if (o == null){
+      return 0;
+    }
+    
+    Set<?> es = ((Map<?, ?>)o).entrySet();
+    for (Object entry : es) {
+      Object mapKey = ((Entry<?, ?>) entry).getKey();
+      Object mapVal = ((Entry<?, ?>) entry).getValue();                  
+      if ((o instanceof DirtyMapWrapper && ((DirtyMapWrapper<?, ?>)o).isDirty())
+          || !(o instanceof DirtyMapWrapper)) { //mapVal instanceof Dirtyable && ((Dirtyable)mapVal).isDirty()) {
+        m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(valueType, mapVal)));
+        count++;
+      }
+      // TODO map value deletion
+    }
+    return count;
+  }
+
+  private int putArray(Mutation m, int count, Object o, Pair<Text, Text> col) {
+
+    // First of all we delete array field on accumulo store
+    Text rowKey = new Text(m.getRow());
+    Query<K, T> query = newQuery();
+    query.setFields(col.getFirst().toString());
+    query.setStartKey((K)rowKey.toString());
+    query.setEndKey((K)rowKey.toString());
+    deleteByQuery(query);
+    flush();
+    if (o == null){
+      return 0;
+    }
+    
+    List<?> array = (List<?>) o;  // both GenericArray and DirtyListWrapper
+    int j = 0;
+    for (Object item : array) {
+      m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item)));
+      count++;
+    }
+    return count;
+  }
+
   @Override
   public boolean delete(K key) {
     Query<K,T> q = newQuery();
@@ -620,7 +787,7 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
       // add iterator that drops values on the server side
       scanner.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, SortedKeyIterator.class));
       RowIterator iterator = new RowIterator(scanner.iterator());
-      
+
       long count = 0;
 
       while (iterator.hasNext()) {
@@ -637,7 +804,7 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
         getBatchWriter().addMutation(m);
         count++;
       }
-      
+
       return count;
     } catch (TableNotFoundException e) {
       // TODO return 0?
@@ -655,34 +822,34 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
   private Range createRange(Query<K,T> query) {
     Text startRow = null;
     Text endRow = null;
-    
+
     if (query.getStartKey() != null)
       startRow = new Text(toBytes(query.getStartKey()));
-    
+
     if (query.getEndKey() != null)
       endRow = new Text(toBytes(query.getEndKey()));
-    
+
     return new Range(startRow, true, endRow, true);
-    
+
   }
-  
+
   private Scanner createScanner(Query<K,T> query) throws TableNotFoundException {
     // TODO make isolated scanner optional?
     Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Constants.NO_AUTHS));
     setFetchColumns(scanner, query.getFields());
-    
+
     scanner.setRange(createRange(query));
-    
+
     if (query.getStartTime() != -1 || query.getEndTime() != -1) {
       IteratorSetting is = new IteratorSetting(30, TimestampFilter.class);
       if (query.getStartTime() != -1)
         TimestampFilter.setStart(is, query.getStartTime(), true);
       if (query.getEndTime() != -1)
         TimestampFilter.setEnd(is, query.getEndTime(), true);
-      
+
       scanner.addScanIterator(is);
     }
-    
+
     return scanner;
   }
 
@@ -697,7 +864,7 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
       return null;
     } 
   }
-  
+
   @Override
   public Query<K,T> newQuery() {
     return new AccumuloQuery<K,T>(this);
@@ -706,14 +873,14 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
   Text pad(Text key, int bytes) {
     if (key.getLength() < bytes)
       key = new Text(key);
-    
+
     while (key.getLength() < bytes) {
       key.append(new byte[] {0}, 0, 1);
     }
-    
+
     return key;
   }
-  
+
   @Override
   public List<PartitionQuery<K,T>> getPartitions(Query<K,T> query) throws IOException {
     try {
@@ -721,12 +888,12 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
       if (conn instanceof MockConnector)
         tl = new MockTabletLocator();
       else
-        tl = TabletLocator.getInstance(conn.getInstance(), authInfo, new Text(Tables.getTableId(conn.getInstance(), mapping.tableName)));
-      
+        tl = TabletLocator.getInstance(conn.getInstance(), new Text(Tables.getTableId(conn.getInstance(), mapping.tableName)));
+
       Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-      
+
       tl.invalidateCache();
-      while (tl.binRanges(Collections.singletonList(createRange(query)), binnedRanges).size() > 0) {
+      while (tl.binRanges(Collections.singletonList(createRange(query)), binnedRanges, credentials).size() > 0) {
         // TODO log?
         if (!Tables.exists(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName)))
           throw new TableDeletedException(Tables.getTableId(conn.getInstance(), mapping.tableName));
@@ -735,19 +902,19 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
         UtilWaitThread.sleep(100);
         tl.invalidateCache();
       }
-      
+
       List<PartitionQuery<K,T>> ret = new ArrayList<PartitionQuery<K,T>>();
-      
+
       Text startRow = null;
       Text endRow = null;
       if (query.getStartKey() != null)
         startRow = new Text(toBytes(query.getStartKey()));
       if (query.getEndKey() != null)
         endRow = new Text(toBytes(query.getEndKey()));
-     
+
       //hadoop expects hostnames, accumulo keeps track of IPs... so need to convert
       HashMap<String,String> hostNameCache = new HashMap<String,String>();
- 
+
       for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) {
         String ip = entry.getKey().split(":", 2)[0];
         String location = hostNameCache.get(ip);
@@ -759,7 +926,7 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
 
         Map<KeyExtent,List<Range>> tablets = entry.getValue();
         for (KeyExtent ke : tablets.keySet()) {
-          
+
           K startKey = null;
           if (startRow == null || !ke.contains(startRow)) {
             if (ke.getPrevEndRow() != null) {
@@ -768,7 +935,7 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
           } else {
             startKey = fromBytes(getKeyClass(), TextUtil.getBytes(startRow));
           }
-          
+
           K endKey = null;
           if (endRow == null || !ke.contains(endRow)) {
             if (ke.getEndRow() != null)
@@ -776,13 +943,13 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
           } else {
             endKey = fromBytes(getKeyClass(), TextUtil.getBytes(endRow));
           }
-          
-          PartitionQueryImpl pqi = new PartitionQueryImpl<K,T>(query, startKey, endKey, new String[] {location});
+
+          PartitionQueryImpl<K, T> pqi = new PartitionQueryImpl<K,T>(query, startKey, endKey, new String[] {location});
           pqi.setConf(getConf());
           ret.add(pqi);
         }
       }
-      
+
       return ret;
     } catch (TableNotFoundException e) {
       throw new IOException(e);
@@ -791,11 +958,11 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
     } catch (AccumuloSecurityException e) {
       throw new IOException(e);
     }
-    
+
   }
-  
+
   static <K> K lastPossibleKey(Encoder encoder, Class<K> clazz, byte[] er) {
-    
+
     if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
       throw new UnsupportedOperationException();
     } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
@@ -815,19 +982,20 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
     } else if (clazz.equals(Utf8.class)) {
       return fromBytes(encoder, clazz, er);
     }
-    
+
     throw new IllegalArgumentException("Unknown type " + clazz.getName());
   }
 
 
-  
+
   /**
    * @param keyClass
    * @param bytes
    * @return
    */
+  @SuppressWarnings("unchecked")
   static <K> K followingKey(Encoder encoder, Class<K> clazz, byte[] per) {
-    
+
     if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
       return (K) Byte.valueOf(encoder.followingKey(1, per)[0]);
     } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml
----------------------------------------------------------------------
diff --git a/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml b/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml
index 57104a9..362a2bf 100644
--- a/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml
+++ b/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml
@@ -47,6 +47,7 @@
     <field name="content" family="content" qualifier="c"/>
     <field name="parsedContent" family="parsedContent"/>
     <field name="outlinks" family="outlinks"/>
+    <field name="headers" family="headers"/>
     <field name="metadata" family="common" qualifier="metadata"/>
   </class>
 

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-accumulo/src/test/resources/gora.properties
----------------------------------------------------------------------
diff --git a/gora-accumulo/src/test/resources/gora.properties b/gora-accumulo/src/test/resources/gora.properties
index 21a7e56..f89a360 100644
--- a/gora-accumulo/src/test/resources/gora.properties
+++ b/gora-accumulo/src/test/resources/gora.properties
@@ -18,4 +18,4 @@ gora.datastore.accumulo.mock=true
 gora.datastore.accumulo.instance=a14
 gora.datastore.accumulo.zookeepers=localhost
 gora.datastore.accumulo.user=root
-gora.datastore.accumulo.password=secret
\ No newline at end of file
+gora.datastore.accumulo.password=
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/gora-cassandra/pom.xml b/gora-cassandra/pom.xml
index a6529c6..95e63ac 100644
--- a/gora-cassandra/pom.xml
+++ b/gora-cassandra/pom.xml
@@ -35,21 +35,21 @@
     support.</description>
     <inceptionYear>2010</inceptionYear>
     <organization>
-    	<name>The Apache Software Foundation</name>
-    	<url>http://www.apache.org/</url>
+        <name>The Apache Software Foundation</name>
+        <url>http://www.apache.org/</url>
     </organization>
     <scm>
-    	<url>http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/</url>
-    	<connection>scm:svn:http://svn.apache.org/repos/asf/gora/trunk/gora-cassandra/</connection>
-    	<developerConnection>scm:svn:https://svn.apache.org/repos/asf/gora/trunk/gora-cassandra/</developerConnection>
+        <url>http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/</url>
+        <connection>scm:svn:http://svn.apache.org/repos/asf/gora/trunk/gora-cassandra/</connection>
+        <developerConnection>scm:svn:https://svn.apache.org/repos/asf/gora/trunk/gora-cassandra/</developerConnection>
     </scm>
     <issueManagement>
-    	<system>JIRA</system>
-    	<url>https://issues.apache.org/jira/browse/GORA</url>
+        <system>JIRA</system>
+        <url>https://issues.apache.org/jira/browse/GORA</url>
     </issueManagement>
     <ciManagement>
-    	<system>Jenkins</system>
-    	<url>https://builds.apache.org/job/Gora-trunk/</url>
+        <system>Jenkins</system>
+        <url>https://builds.apache.org/job/Gora-trunk/</url>
     </ciManagement>
 
     <properties>
@@ -127,15 +127,15 @@
             <exclusions>
                 <exclusion>
                     <groupId>org.apache.cassandra.deps</groupId>
-    		    <artifactId>avro</artifactId>
+                    <artifactId>avro</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
         
-        <dependency>
+        <!-- dependency>
             <groupId>org.apache.cassandra</groupId>
             <artifactId>cassandra-thrift</artifactId>
-        </dependency>
+        </dependency-->
 
         <dependency>
             <groupId>org.hectorclient</groupId>
@@ -143,7 +143,7 @@
                 <exclusions>
                     <exclusion>
                         <groupId>org.apache.cassandra</groupId>
-    		        <artifactId>cassandra-all</artifactId>
+                        <artifactId>cassandra-all</artifactId>
                     </exclusion>
             </exclusions>
         </dependency>
@@ -159,11 +159,13 @@
             <artifactId>jdom</artifactId>
         </dependency>
 
-        <!-- Logging Dependencies -->
+        
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
+            <groupId>org.apache.avro</groupId>
             <artifactId>avro</artifactId>
         </dependency>
+        
+        <!-- Logging Dependencies -->
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
index a795a77..d792685 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
@@ -18,12 +18,19 @@
 
 package org.apache.gora.cassandra.query;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
 
 import me.prettyprint.hector.api.Serializer;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.gora.cassandra.serializers.AvroSerializerUtil;
 import org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +48,7 @@ public abstract class CassandraColumn {
   private int type;
   private Field field;
   private int unionType;
-
+  
   public void setUnionType(int pUnionType){
     this.unionType = pUnionType;
   }
@@ -72,7 +79,7 @@ public abstract class CassandraColumn {
 
   public abstract ByteBuffer getName();
   public abstract Object getValue();
-
+  
   protected Object fromByteBuffer(Schema schema, ByteBuffer byteBuffer) {
     Object value = null;
     Serializer<?> serializer = GoraSerializerTypeInferer.getSerializer(schema);
@@ -81,8 +88,14 @@ public abstract class CassandraColumn {
           + "could be found. Please report this to dev@gora.apache.org");
     } else {
       value = serializer.fromByteBuffer(byteBuffer);
+      if (schema.getType().equals(Type.RECORD) || schema.getType().equals(Type.MAP) ){
+        try {
+          value = AvroSerializerUtil.deserializer(value, schema);
+        } catch (IOException e) {
+          LOG.warn(field.name() + " named field could not be deserialized.");
+        }
+      }
     }
     return value;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
index cd17453..61dd5bb 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
@@ -72,7 +72,7 @@ public class CassandraResult<K, T extends PersistentBase> extends ResultBase<K,
     
     for (int iCnt = 0; iCnt < pCassandraRow.length; iCnt++){
       CassandraColumn cColumn = (CassandraColumn)pCassandraRow[iCnt];
-      String columnName = StringSerializer.get().fromByteBuffer(cColumn.getName());
+      String columnName = StringSerializer.get().fromByteBuffer(cColumn.getName().duplicate());
       if (pFieldName.equals(columnName))
         return cColumn;
     }
@@ -95,37 +95,50 @@ public class CassandraResult<K, T extends PersistentBase> extends ResultBase<K,
     List<Field> fields = schema.getFields();
     
     for (CassandraColumn cassandraColumn: cassandraRow) {
-      
       // get field name
-      String family = cassandraColumn.getFamily();
-      String fieldName = this.reverseMap.get(family + ":" + StringSerializer.get().fromByteBuffer(cassandraColumn.getName()));
+      String family = cassandraColumn.getFamily();  
+      
+      String fieldName = this.reverseMap.get(family + ":" + StringSerializer.get().fromByteBuffer(cassandraColumn.getName().duplicate()));
       
-      if (fieldName != null ){
+      if (fieldName != null) {
         // get field
-        int pos = this.persistent.getFieldIndex(fieldName);
-        Field field = fields.get(pos);
-        Type fieldType = field.schema().getType();
-        System.out.println(StringSerializer.get().fromByteBuffer(cassandraColumn.getName()) + fieldName + " " + fieldType.name());
-        if (fieldType == Type.UNION){
-          // TODO getting UNION stored type
-          // TODO get value of UNION stored type. This field does not need to be written back to the store
-          cassandraColumn.setUnionType(getNonNullTypePos(field.schema().getTypes()));
+        if (fieldName.indexOf(CassandraStore.UNION_COL_SUFIX) < 0) {
+
+          int pos = this.persistent.getSchema().getField(fieldName).pos();
+          Field field = fields.get(pos);
+          Type fieldType = field.schema().getType();
+          // LOG.info(StringSerializer.get().fromByteBuffer(cassandraColumn.getName())
+          // + fieldName + " " + fieldType.name());
+          if (fieldType.equals(Type.UNION)) {
+            //getting UNION stored type
+            CassandraColumn cc = getUnionTypeColumn(fieldName
+                + CassandraStore.UNION_COL_SUFIX, cassandraRow.toArray());
+            //creating temporary UNION Field
+            Field unionField = new Field(fieldName
+                + CassandraStore.UNION_COL_SUFIX, Schema.create(Type.INT),
+                null, null);
+            // get value of UNION stored type
+            cc.setField(unionField);
+            Object val = cc.getValue();
+            cassandraColumn.setUnionType(Integer.parseInt(val.toString()));
+          }
+
+          // get value
+          cassandraColumn.setField(field);
+          Object value = cassandraColumn.getValue();
+
+          this.persistent.put(pos, value);
+          // this field does not need to be written back to the store
+          this.persistent.clearDirty(pos);
         }
-
-        // get value
-        cassandraColumn.setField(field);
-        Object value = cassandraColumn.getValue();
-
-        this.persistent.put(pos, value);
-        // this field does not need to be written back to the store
-        this.persistent.clearDirty(pos);
-      }
-      else
+      } else
         LOG.debug("FieldName was null while iterating CassandraRow and using Avro Union type");
     }
 
   }
 
+  //TODO Should we remove this method?
+  @SuppressWarnings("unused")
   private int getNonNullTypePos(List<Schema> pTypes){
     int iCnt = 0;
     for (Schema sch :  pTypes)

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
index 135d47d..12dada1 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
@@ -19,40 +19,23 @@
 package org.apache.gora.cassandra.query;
 
 import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetEncoder;
+import java.util.List;
+import java.util.Map;
 
-import me.prettyprint.cassandra.serializers.FloatSerializer;
-import me.prettyprint.cassandra.serializers.DoubleSerializer;
-import me.prettyprint.cassandra.serializers.IntegerSerializer;
-import me.prettyprint.cassandra.serializers.LongSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
 import me.prettyprint.hector.api.beans.HColumn;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericArray;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.util.Utf8;
-import org.apache.gora.cassandra.serializers.GenericArraySerializer;
-import org.apache.gora.cassandra.serializers.StatefulHashMapSerializer;
-import org.apache.gora.cassandra.serializers.TypeUtils;
+import org.apache.gora.cassandra.serializers.ListSerializer;
+import org.apache.gora.cassandra.serializers.MapSerializer;
 import org.apache.gora.cassandra.store.CassandraStore;
-import org.apache.gora.persistency.StatefulHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CassandraSubColumn extends CassandraColumn {
   public static final Logger LOG = LoggerFactory.getLogger(CassandraSubColumn.class);
 
-  private static final String ENCODING = "UTF-8";
-  
-  private static CharsetEncoder charsetEncoder = Charset.forName(ENCODING).newEncoder();;
-
-
   /**
    * Key-value pair containing the raw data.
    */
@@ -62,6 +45,32 @@ public class CassandraSubColumn extends CassandraColumn {
     return hColumn.getName();
   }
 
+  private Object getFieldValue(Type type, Schema fieldSchema, ByteBuffer byteBuffer){
+    Object value = null;
+    if (type.equals(Type.ARRAY)) {
+      ListSerializer<?> serializer = ListSerializer.get(fieldSchema.getElementType());
+      List<?> genericArray = serializer.fromByteBuffer(byteBuffer);
+      value = genericArray;
+    } else if (type.equals(Type.MAP)) {
+//      MapSerializer<?> serializer = MapSerializer.get(fieldSchema.getValueType());
+//      Map<?, ?> map = serializer.fromByteBuffer(byteBuffer);
+//      value = map;
+      value = fromByteBuffer(fieldSchema, byteBuffer);
+    } else if (type.equals(Type.RECORD)){
+      value = fromByteBuffer(fieldSchema, byteBuffer);
+    } else if (type.equals(Type.UNION)){
+      // the selected union schema is obtained
+      Schema unionFieldSchema = getUnionSchema(super.getUnionType(), fieldSchema);
+      Type unionFieldType = unionFieldSchema.getType();
+      // we use the selected union schema to deserialize our actual value
+      //value = fromByteBuffer(unionFieldSchema, byteBuffer);
+      value = getFieldValue(unionFieldType, unionFieldSchema, byteBuffer);
+    } else {
+      value = fromByteBuffer(fieldSchema, byteBuffer);
+    }
+    return value;
+  }
+
   /**
    * Deserialize a String into an typed Object, according to the field schema.
    * @see org.apache.gora.cassandra.query.CassandraColumn#getValue()
@@ -74,24 +83,8 @@ public class CassandraSubColumn extends CassandraColumn {
     if (byteBuffer == null) {
       return null;
     }
-    Object value = null;
-    if (type == Type.ARRAY) {
-      GenericArraySerializer serializer = GenericArraySerializer.get(fieldSchema.getElementType());
-      GenericArray genericArray = serializer.fromByteBuffer(byteBuffer);
-      value = genericArray;
-    } else if (type == Type.MAP) {
-      StatefulHashMapSerializer serializer = StatefulHashMapSerializer.get(fieldSchema.getValueType());
-      StatefulHashMap map = serializer.fromByteBuffer(byteBuffer);
-      value = map;
-    } else if (type == Type.UNION){
-      // the selected union schema is obtained
-      Schema unionFieldSchema = getUnionSchema(super.getUnionType(), field.schema());
-      // we use the selected union schema to deserialize our actual value
-      value = fromByteBuffer(unionFieldSchema, byteBuffer);
-    } else {
-      value = fromByteBuffer(fieldSchema, byteBuffer);
-    }
 
+    Object value = getFieldValue(type, fieldSchema, byteBuffer);
     return value;
   }
   

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
index a065f0c..bbf1c34 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
@@ -19,8 +19,12 @@
 package org.apache.gora.cassandra.query;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
 import me.prettyprint.cassandra.serializers.StringSerializer;
 import me.prettyprint.hector.api.beans.HColumn;
 import me.prettyprint.hector.api.beans.HSuperColumn;
@@ -29,9 +33,8 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.util.Utf8;
-import org.apache.gora.cassandra.serializers.Utf8Serializer;
-import org.apache.gora.persistency.ListGenericArray;
-import org.apache.gora.persistency.StatefulHashMap;
+import org.apache.gora.cassandra.serializers.CharSequenceSerializer;
+import org.apache.gora.cassandra.store.CassandraStore;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,19 +48,14 @@ public class CassandraSuperColumn extends CassandraColumn {
     return StringSerializer.get().toByteBuffer(hSuperColumn.getName());
   }
 
-  public Object getValue() {
-    Field field = getField();
-    Schema fieldSchema = field.schema();
-    Type type = fieldSchema.getType();
-    
+ private Object getSuperValue(Field field, Schema fieldSchema, Type type){
     Object value = null;
     
     switch (type) {
       case ARRAY:
-        ListGenericArray array = new ListGenericArray(fieldSchema.getElementType());
+        List<Object> array = new ArrayList<Object>();
         
         for (HColumn<ByteBuffer, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
-          ByteBuffer memberByteBuffer = hColumn.getValue();
           Object memberValue = fromByteBuffer(fieldSchema.getElementType(), hColumn.getValue());
           // int i = IntegerSerializer().get().fromByteBuffer(hColumn.getName());
           array.add(memberValue);      
@@ -66,13 +64,26 @@ public class CassandraSuperColumn extends CassandraColumn {
         
         break;
       case MAP:
-        Map<Utf8, Object> map = new StatefulHashMap<Utf8, Object>();
-        
+        Map<CharSequence, Object> map = new HashMap<CharSequence, Object>();
+
         for (HColumn<ByteBuffer, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
-          ByteBuffer memberByteBuffer = hColumn.getValue();
-          Object memberValue = null;
-          memberValue = fromByteBuffer(fieldSchema.getValueType(), hColumn.getValue());
-          map.put(Utf8Serializer.get().fromByteBuffer(hColumn.getName()), memberValue);      
+          CharSequence mapKey = CharSequenceSerializer.get().fromByteBuffer(hColumn.getName());
+          if (mapKey.toString().indexOf(CassandraStore.UNION_COL_SUFIX) < 0) {
+            Object memberValue = null;
+            // We need detect real type for UNION Fields
+            if (fieldSchema.getValueType().getType().equals(Type.UNION)){
+              
+              HColumn<ByteBuffer, ByteBuffer> cc = getUnionTypeColumn(mapKey
+                  + CassandraStore.UNION_COL_SUFIX, this.hSuperColumn.getColumns());
+              Integer unionIndex = getUnionIndex(mapKey.toString(), cc);
+              Schema realSchema = fieldSchema.getValueType().getTypes().get(unionIndex);
+              memberValue = fromByteBuffer(realSchema, hColumn.getValue());
+              
+            }else{
+              memberValue = fromByteBuffer(fieldSchema.getValueType(), hColumn.getValue());            
+            }            
+            map.put(mapKey, memberValue);      
+          }
         }
         value = map;
         
@@ -104,21 +115,77 @@ public class CassandraSuperColumn extends CassandraColumn {
 
           for (HColumn<ByteBuffer, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
             String memberName = StringSerializer.get().fromByteBuffer(hColumn.getName());
+            if (memberName.indexOf(CassandraStore.UNION_COL_SUFIX) < 0) {
+              
             if (memberName == null || memberName.length() == 0) {
               LOG.warn("member name is null or empty.");
               continue;
             }
             Field memberField = fieldSchema.getField(memberName);
+            Schema memberSchema = memberField.schema();
+            Type memberType = memberSchema.getType();
+            
             CassandraSubColumn cassandraColumn = new CassandraSubColumn();
             cassandraColumn.setField(memberField);
             cassandraColumn.setValue(hColumn);
-            record.put(record.getFieldIndex(memberName), cassandraColumn.getValue());
+            
+            if (memberType.equals(Type.UNION)){
+              HColumn<ByteBuffer, ByteBuffer> hc = getUnionTypeColumn(memberField.name()
+                  + CassandraStore.UNION_COL_SUFIX, this.hSuperColumn.getColumns().toArray());
+              Integer unionIndex = getUnionIndex(memberField.name(),hc);
+              cassandraColumn.setUnionType(unionIndex);
+            }
+            
+            record.put(record.getSchema().getField(memberName).pos(), cassandraColumn.getValue());
+          }
           }
         }
         break;
+      case UNION:
+        int schemaPos = this.getUnionType();
+        Schema unioSchema = fieldSchema.getTypes().get(schemaPos);
+        Type unionType = unioSchema.getType();
+        value = getSuperValue(field, unioSchema, unionType);
+        break;
       default:
-        LOG.warn("Type: " + type.name() + " not supported for field: " + field.name() + ". Please report this to dev@gora.apache.org");
+        Object memberValue = null;
+        // Using for UnionIndex of Union type field get value. UnionIndex always Integer.  
+        for (HColumn<ByteBuffer, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
+          memberValue = fromByteBuffer(fieldSchema, hColumn.getValue());      
+        }
+        value = memberValue;
+        LOG.warn("Type: " + type.name() + " not supported for field: " + field.name());
     }
+    return value;
+  }
+
+ private Integer getUnionIndex(String fieldName, HColumn<ByteBuffer, ByteBuffer> uc){
+   Integer val = IntegerSerializer.get().fromByteBuffer(uc.getValue());
+   return Integer.parseInt(val.toString());
+ }
+ 
+  private HColumn<ByteBuffer, ByteBuffer> getUnionTypeColumn(String fieldName,
+    List<HColumn<ByteBuffer, ByteBuffer>> columns) {
+    return getUnionTypeColumn(fieldName, columns.toArray());
+}
+
+  private HColumn<ByteBuffer, ByteBuffer> getUnionTypeColumn(String fieldName, Object[] hColumns) {
+    for (int iCnt = 0; iCnt < hColumns.length; iCnt++){
+      @SuppressWarnings("unchecked")
+      HColumn<ByteBuffer, ByteBuffer> hColumn = (HColumn<ByteBuffer, ByteBuffer>)hColumns[iCnt];
+      String columnName = StringSerializer.get().fromByteBuffer(hColumn.getNameBytes().duplicate());
+      if (fieldName.equals(columnName))
+        return hColumn;
+    }
+    return null;
+}
+
+  public Object getValue() {
+    Field field = getField();
+    Schema fieldSchema = field.schema();
+    Type type = fieldSchema.getType();
+    
+    Object value = getSuperValue(field, fieldSchema, type);
     
     return value;
   }

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializerUtil.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializerUtil.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializerUtil.java
new file mode 100644
index 0000000..4bb26f8
--- /dev/null
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializerUtil.java
@@ -0,0 +1,94 @@
+package org.apache.gora.cassandra.serializers;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+
+public class AvroSerializerUtil {
+
+  /**
+   * Threadlocals maintaining reusable binary decoders and encoders.
+   */
+  private static ThreadLocal<ByteArrayOutputStream> outputStream =
+      new ThreadLocal<ByteArrayOutputStream>();
+  
+  public static final ThreadLocal<BinaryEncoder> encoders =
+      new ThreadLocal<BinaryEncoder>();
+
+  public static final ThreadLocal<BinaryDecoder> decoders =
+      new ThreadLocal<BinaryDecoder>();
+  
+  /**
+   * Create a {@link java.util.concurrent.ConcurrentHashMap} for the 
+   * datum readers and writers. 
+   * This is necessary because they are not thread safe, at least not before 
+   * Avro 1.4.0 (See AVRO-650).
+   * When they are thread safe, it is possible to maintain a single reader and
+   * writer pair for every schema, instead of one for every thread.
+   * @see <a href="https://issues.apache.org/jira/browse/AVRO-650">AVRO-650</a>
+   */
+  public static final ConcurrentHashMap<String, SpecificDatumWriter<?>> writerMap = 
+      new ConcurrentHashMap<String, SpecificDatumWriter<?>>();  
+  
+  public static final ConcurrentHashMap<String, SpecificDatumReader<?>> readerMap = 
+      new ConcurrentHashMap<String, SpecificDatumReader<?>>();
+  
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public static <T> byte[] serializer(T value, Schema schema) throws IOException{
+    SpecificDatumWriter writer = (SpecificDatumWriter<?>) writerMap.get(schema.getFullName());
+    if (writer == null) {
+      writer = new SpecificDatumWriter(schema);// ignore dirty bits
+      writerMap.put(schema.getFullName(),writer);
+    }
+    
+    BinaryEncoder encoderFromCache = encoders.get();
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    outputStream.set(bos);
+    BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(bos, null);
+    if (encoderFromCache == null) {
+      encoders.set(encoder);
+    }
+    
+    //reset the buffers
+    ByteArrayOutputStream os = outputStream.get();
+    os.reset();
+    
+    writer.write(value, encoder);
+    encoder.flush();
+    byte[] byteValue = os.toByteArray();
+    return byteValue;
+  }
+  
+  public static Object deserializer(Object value, Schema schema) throws IOException{
+    String schemaId = schema.getFullName();      
+    
+    SpecificDatumReader<?> reader = (SpecificDatumReader<?>)readerMap.get(schemaId);
+    if (reader == null) {
+      reader = new SpecificDatumReader(schema);// ignore dirty bits
+      SpecificDatumReader localReader=null;
+      if((localReader=readerMap.putIfAbsent(schemaId, reader))!=null) {
+        reader = localReader;
+      }
+    }
+    
+    // initialize a decoder, possibly reusing previous one
+    BinaryDecoder decoderFromCache = decoders.get();
+    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder((byte[])value, null);
+    // put in threadlocal cache if the initial get was empty
+    if (decoderFromCache==null) {
+      decoders.set(decoder);
+    }
+
+    Object result = reader.read(null, decoder);
+    return result;
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CharSequenceSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CharSequenceSerializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CharSequenceSerializer.java
new file mode 100644
index 0000000..5afb2e9
--- /dev/null
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CharSequenceSerializer.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import static me.prettyprint.hector.api.ddl.ComparatorType.UTF8TYPE;
+
+import java.nio.ByteBuffer;
+
+import org.apache.avro.util.Utf8;
+
+import me.prettyprint.cassandra.serializers.AbstractSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.ddl.ComparatorType;
+
+/**
+ * A CharSequenceSerializer translates the byte[] to and from CharSequenceSerializer object of Avro.
+ */
+public final class CharSequenceSerializer extends AbstractSerializer<CharSequence> {
+
+  private static final CharSequenceSerializer instance = new CharSequenceSerializer();
+
+  public static CharSequenceSerializer get() {
+    return instance;
+  }
+
+  @Override
+  public ByteBuffer toByteBuffer(CharSequence obj) {
+    if (obj == null) {
+      return null;
+    }
+    return StringSerializer.get().toByteBuffer(obj.toString());
+  }
+
+  @Override
+  //TODO: CharSequence cause Test Fail. All tests set UTF8. When change test set type. This will be CharSequence 
+  public Utf8 fromByteBuffer(ByteBuffer byteBuffer) {
+    if (byteBuffer == null) {
+      return null;
+    }
+    return new Utf8(StringSerializer.get().fromByteBuffer(byteBuffer));
+  }
+
+  @Override
+  public ComparatorType getComparatorType() {
+    return UTF8TYPE;
+  }
+
+}