You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bi...@apache.org on 2011/12/05 21:05:51 UTC

svn commit: r1210600 [11/16] - in /incubator/accumulo/trunk/contrib/accumulo_sample: ./ ingest/ ingest/src/main/java/aggregator/ ingest/src/main/java/ingest/ ingest/src/main/java/iterator/ ingest/src/main/java/normalizer/ ingest/src/main/java/protobuf/...

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/QueryLogic.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/QueryLogic.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/QueryLogic.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/QueryLogic.java Mon Dec  5 20:05:49 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package logic;
 
 import iterator.EvaluatingIterator;
@@ -51,155 +51,152 @@ import com.google.protobuf.InvalidProtoc
  * QueryTable implementation that works with the JEXL grammar. This QueryTable
  * uses the metadata, global index, and partitioned table to return
  * results based on the query. Example queries:
- *
+ * 
  *  <b>Single Term Query</b>
  *  'foo' - looks in global index for foo, and if any entries are found, then the query
  *          is rewritten to be field1 == 'foo' or field2 == 'foo', etc. This is then passed
  *          down the optimized query path which uses the intersecting iterators on the shard
  *          table.
- *
+ * 
  *  <b>Boolean expression</b>        
  *  field == 'foo' - For fielded queries, those that contain a field, an operator, and a literal (string or number),
  *                   the query is parsed and the set of eventFields in the query that are indexed is determined by
  *                   querying the metadata table. Depending on the conjunctions in the query (or, and, not) and the
  *                   eventFields that are indexed, the query may be sent down the optimized path or the full scan path.
- *
+ * 
  *  We are not supporting all of the operators that JEXL supports at this time. We are supporting the following operators:
- *
+ * 
  *  ==, !=, &gt;, &ge;, &lt;, &le;, =~, and !~
- *
+ * 
  *  Custom functions can be created and registered with the Jexl engine. The functions can be used in the queries in conjunction
  *  with other supported operators. A sample function has been created, called between, and is bound to the 'f' namespace. An
  *  example using this function is : "f:between(LATITUDE,60.0, 70.0)"
- *
+ * 
  *  <h2>Constraints on Query Structure</h2>
  *  Queries that are sent to this class need to be formatted such that there is a space on either side of the operator. We are
  *  rewriting the query in some cases and the current implementation is expecting a space on either side of the operator. Users
  *  should also be aware that the literals used in the query need to match the data in the table. If an error occurs in the evaluation 
  *  we are skipping the event.
- *
+ * 
  *  <h2>Notes on Optimization</h2>
  *  Queries that meet any of the following criteria will perform a full scan of the events in the partitioned table:
- *
+ * 
  *  1. An 'or' conjunction exists in the query but not all of the terms are indexed.
  *  2. No indexed terms exist in the query
  *  3. An unsupported operator exists in the query
- *
+ * 
  * </pre>
- *
+ * 
  */
 public class QueryLogic extends AbstractQueryLogic {
-
-    protected static Logger log = Logger.getLogger(QueryLogic.class);
-
-    private static String startPartition = "0";
+  
+  protected static Logger log = Logger.getLogger(QueryLogic.class);
+  
+  private static String startPartition = "0";
+  
+  public QueryLogic() {
+    super();
+  }
+  
+  @Override
+  protected RangeCalculator getTermIndexInformation(Connector c, Authorizations auths, Multimap<String,Normalizer> indexedTerms,
+      Multimap<String,QueryTerm> terms, String indexTableName, String reverseIndexTableName, String queryString, int queryThreads, Set<String> typeFilter)
+      throws TableNotFoundException, org.apache.commons.jexl2.parser.ParseException {
+    RangeCalculator calc = new RangeCalculator();
+    calc.execute(c, auths, indexedTerms, terms, queryString, this, typeFilter);
+    return calc;
+  }
+  
+  protected Collection<Range> getFullScanRange(Date begin, Date end, Multimap<String,QueryTerm> terms) {
+    String startKey = startPartition;
+    String endKey = Integer.toString(this.getNumPartitions());
+    Range r = new Range(startKey, true, endKey, false);
+    return Collections.singletonList(r);
+  }
+  
+  @Override
+  protected IndexRanges getTermIndexInformation(Connector c, Authorizations auths, String value, Set<String> typeFilter) throws TableNotFoundException {
+    final String dummyTermName = "DUMMY";
+    UnionIndexRanges indexRanges = new UnionIndexRanges();
     
-    public QueryLogic() {
-        super();
+    // The entries in the index are normalized, since we don't have a field, just try using the LcNoDiacriticsNormalizer.
+    String normalizedFieldValue = new LcNoDiacriticsNormalizer().normalizeFieldValue("", value);
+    // Remove the begin and end ' marks
+    if (normalizedFieldValue.startsWith("'") && normalizedFieldValue.endsWith("'")) {
+      normalizedFieldValue = normalizedFieldValue.substring(1, normalizedFieldValue.length() - 1);
     }
-    
-    @Override
-	protected RangeCalculator getTermIndexInformation(Connector c,
-			Authorizations auths, Multimap<String, Normalizer> indexedTerms, Multimap<String, QueryTerm> terms, 
-			String indexTableName, String reverseIndexTableName, String queryString, int queryThreads, Set<String> typeFilter)
-			throws TableNotFoundException, org.apache.commons.jexl2.parser.ParseException {
-    	RangeCalculator calc = new RangeCalculator();
-    	calc.execute(c, auths, indexedTerms, terms, queryString, this, typeFilter);
-    	return calc;
-	}
-
-
-	protected Collection<Range> getFullScanRange(Date begin, Date end, Multimap<String, QueryTerm> terms) {
-        String startKey = startPartition;
-        String endKey = Integer.toString(this.getNumPartitions());
-        Range r = new Range(startKey, true, endKey, false);
-        return Collections.singletonList(r);
-	}
-
-
-    @Override
-    protected IndexRanges getTermIndexInformation(Connector c, Authorizations auths, String value, Set<String> typeFilter) throws TableNotFoundException {
-        final String dummyTermName = "DUMMY";
-        UnionIndexRanges indexRanges = new UnionIndexRanges();
-
-        //The entries in the index are normalized, since we don't have a field, just try using the LcNoDiacriticsNormalizer.
-        String normalizedFieldValue = new LcNoDiacriticsNormalizer().normalizeFieldValue("", value);
-        //Remove the begin and end ' marks
-        if (normalizedFieldValue.startsWith("'") && normalizedFieldValue.endsWith("'")) {
-            normalizedFieldValue = normalizedFieldValue.substring(1, normalizedFieldValue.length() - 1);
-        }
-        Text fieldValue = new Text(normalizedFieldValue);
-        if (log.isDebugEnabled()) {
-            log.debug("Querying index table : " + this.getIndexTableName() + " for normalized indexed term: " + fieldValue);
-        }
-        Scanner scanner = c.createScanner(this.getIndexTableName(), auths);
-        Range r = new Range(fieldValue);
-        scanner.setRange(r);
-        if (log.isDebugEnabled()) {
-            log.debug("Range for index query: " + r.toString());
-        }
-        for (Entry<Key, Value> entry : scanner) {
-            if (log.isDebugEnabled()) {
-                log.debug("Index entry: " + entry.getKey().toString());
-            }
-            //Get the shard id and datatype from the colq
-            String fieldName = entry.getKey().getColumnFamily().toString();
-            String colq = entry.getKey().getColumnQualifier().toString();
-            int separator = colq.indexOf(EvaluatingIterator.NULL_BYTE_STRING);
-            String shardId = null;
-            String datatype = null;
-            if (separator != -1) {
-                shardId = colq.substring(0, separator);
-                datatype = colq.substring(separator + 1);
-            } else {
-                shardId = colq;
-            }
-            //Skip this entry if the type is not correct
-            if (null != datatype && null != typeFilter && !typeFilter.contains(datatype))
-            	continue;
-            //Parse the UID.List object from the value
-            Uid.List uidList = null;
-            try {
-                uidList = Uid.List.parseFrom(entry.getValue().get());
-            } catch (InvalidProtocolBufferException e) {
-                //Don't add UID information, at least we know what shards
-                //it is located in.
-            }
-
-            //Add the count for this shard to the total count for the term.
-            long count = 0;
-            Long storedCount = indexRanges.getTermCardinality().get(dummyTermName);
-            if (null == storedCount) {
-                count = uidList.getCOUNT();
-            } else {
-                count = uidList.getCOUNT() + storedCount;
-            }
-            indexRanges.getTermCardinality().put(dummyTermName, count);
-            //Add the field name
-            indexRanges.getFieldNamesAndValues().put(fieldName, normalizedFieldValue);
-
-            //Create the keys
-            Text shard = new Text(shardId);
-            if (uidList.getIGNORE()) {
-                //Then we create a scan range that is the entire shard
-                indexRanges.add(dummyTermName, new Range(shard));
-            } else {
-                //We should have UUIDs, create event ranges
-                for (String uuid : uidList.getUIDList()) {
-                    Text cf = new Text(datatype);
-                    TextUtil.textAppend(cf, uuid);
-                    Key startKey = new Key(shard, cf);
-                    Key endKey = new Key(shard, new Text(cf.toString() + EvaluatingIterator.NULL_BYTE_STRING));
-                    Range eventRange = new Range(startKey, true, endKey, false);
-                    indexRanges.add(dummyTermName, eventRange);
-                }
-            }
-        }
-        if (log.isDebugEnabled()) {
-            log.debug("Found " + indexRanges.getRanges().size() + " entries in the index for field value: " + normalizedFieldValue);
+    Text fieldValue = new Text(normalizedFieldValue);
+    if (log.isDebugEnabled()) {
+      log.debug("Querying index table : " + this.getIndexTableName() + " for normalized indexed term: " + fieldValue);
+    }
+    Scanner scanner = c.createScanner(this.getIndexTableName(), auths);
+    Range r = new Range(fieldValue);
+    scanner.setRange(r);
+    if (log.isDebugEnabled()) {
+      log.debug("Range for index query: " + r.toString());
+    }
+    for (Entry<Key,Value> entry : scanner) {
+      if (log.isDebugEnabled()) {
+        log.debug("Index entry: " + entry.getKey().toString());
+      }
+      // Get the shard id and datatype from the colq
+      String fieldName = entry.getKey().getColumnFamily().toString();
+      String colq = entry.getKey().getColumnQualifier().toString();
+      int separator = colq.indexOf(EvaluatingIterator.NULL_BYTE_STRING);
+      String shardId = null;
+      String datatype = null;
+      if (separator != -1) {
+        shardId = colq.substring(0, separator);
+        datatype = colq.substring(separator + 1);
+      } else {
+        shardId = colq;
+      }
+      // Skip this entry if the type is not correct
+      if (null != datatype && null != typeFilter && !typeFilter.contains(datatype))
+        continue;
+      // Parse the UID.List object from the value
+      Uid.List uidList = null;
+      try {
+        uidList = Uid.List.parseFrom(entry.getValue().get());
+      } catch (InvalidProtocolBufferException e) {
+        // Don't add UID information, at least we know what shards
+        // it is located in.
+      }
+      
+      // Add the count for this shard to the total count for the term.
+      long count = 0;
+      Long storedCount = indexRanges.getTermCardinality().get(dummyTermName);
+      if (null == storedCount) {
+        count = uidList.getCOUNT();
+      } else {
+        count = uidList.getCOUNT() + storedCount;
+      }
+      indexRanges.getTermCardinality().put(dummyTermName, count);
+      // Add the field name
+      indexRanges.getFieldNamesAndValues().put(fieldName, normalizedFieldValue);
+      
+      // Create the keys
+      Text shard = new Text(shardId);
+      if (uidList.getIGNORE()) {
+        // Then we create a scan range that is the entire shard
+        indexRanges.add(dummyTermName, new Range(shard));
+      } else {
+        // We should have UUIDs, create event ranges
+        for (String uuid : uidList.getUIDList()) {
+          Text cf = new Text(datatype);
+          TextUtil.textAppend(cf, uuid);
+          Key startKey = new Key(shard, cf);
+          Key endKey = new Key(shard, new Text(cf.toString() + EvaluatingIterator.NULL_BYTE_STRING));
+          Range eventRange = new Range(startKey, true, endKey, false);
+          indexRanges.add(dummyTermName, eventRange);
         }
-        return indexRanges;
-
+      }
     }
-
+    if (log.isDebugEnabled()) {
+      log.debug("Found " + indexRanges.getRanges().size() + " entries in the index for field value: " + normalizedFieldValue);
+    }
+    return indexRanges;
+    
+  }
+  
 }

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/EventFields.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/EventFields.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/EventFields.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/EventFields.java Mon Dec  5 20:05:49 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package parser;
 
 import java.nio.ByteBuffer;
@@ -36,189 +36,192 @@ import com.google.common.collect.Multise
 import com.google.common.collect.SetMultimap;
 
 /**
- * Object used to hold the fields in an event. This is a multimap because fields can
- * be repeated. 
+ * Object used to hold the fields in an event. This is a multimap because fields can be repeated.
  */
-public class EventFields implements SetMultimap<String, FieldValue>, CustomSerialization {
-	
-	private static boolean kryoInitialized = false;
-	private static ArraySerializer valueSerializer = null;
-	
-	private Multimap<String,FieldValue> map = null;
-	
-	public static class FieldValue {
-		ColumnVisibility visibility;
-		byte[] value;
-		public FieldValue(ColumnVisibility visibility, byte[] value) {
-			super();
-			this.visibility = visibility;
-			this.value = value;
-		}
-		public ColumnVisibility getVisibility() {
-			return visibility;
-		}
-		public byte[] getValue() {
-			return value;
-		}
-		public void setVisibility(ColumnVisibility visibility) {
-			this.visibility = visibility;
-		}
-		public void setValue(byte[] value) {
-			this.value = value;
-		}
-		
-		public int size() {
-			return visibility.flatten().length +  value.length;
-		}
-		
-		@Override
-		public String toString() {
-			StringBuilder buf = new StringBuilder();
-			if (null != visibility)
-				buf.append(" visibility: ").append(new String(visibility.flatten()));
-			if (null != value)
-				buf.append(" value size: ").append(value.length);
-			if (null != value)
-				buf.append(" value: " ).append(new String(value));
-			return buf.toString();
-		}
-		
-	}
-	
-	public EventFields() {
-		map = HashMultimap.create();
-	}
-
-	public int size() {
-		return map.size();
-	}
-
-	public boolean isEmpty() {
-		return map.isEmpty();
-	}
-
-	public boolean containsKey(Object key) {
-		return map.containsKey(key);
-	}
-
-	public boolean containsValue(Object value) {
-		return map.containsValue(value);
-	}
-
-	public boolean containsEntry(Object key, Object value) {
-		return map.containsEntry(key, value);
-	}
-
-	public boolean put(String key, FieldValue value) {
-		return map.put(key, value);
-	}
-
-	public boolean remove(Object key, Object value) {
-		return map.remove(key, value);
-	}
-
-	public boolean putAll(String key, Iterable<? extends FieldValue> values) {
-		return map.putAll(key, values);
-	}
-
-	public boolean putAll(Multimap<? extends String, ? extends FieldValue> multimap) {
-		return map.putAll(multimap);
-	}
-
-	public void clear() {
-		map.clear();
-	}
-
-	public Set<String> keySet() {
-		return map.keySet();
-	}
-
-	public Multiset<String> keys() {
-		return map.keys();
-	}
-
-	public Collection<FieldValue> values() {
-		return map.values();
-	}
-
-	public Set<FieldValue> get(String key) {
-		return (Set<FieldValue>) map.get(key);
-	}
-
-	public Set<FieldValue> removeAll(Object key) {
-		return (Set<FieldValue>) map.removeAll(key);
-	}
-
-	public Set<FieldValue> replaceValues(String key, Iterable<? extends FieldValue> values) {
-		return (Set<FieldValue>) map.replaceValues(key, values);
-	}
-
-	public Set<Entry<String, FieldValue>> entries() {
-		return (Set<Entry<String,FieldValue>>) map.entries();
-	}
-
-	public Map<String, Collection<FieldValue>> asMap() {
-		return map.asMap();
-	}
-
-	public int getByteSize() {
-		 int count = 0;
-		for (Entry<String, FieldValue> e : map.entries()) {
-			count += e.getKey().getBytes().length + e.getValue().size();
-		}
-		return count;
-	}
-	
-	@Override
-	public String toString() {
-		StringBuilder buf = new StringBuilder();
-		for (Entry<String,FieldValue> entry : map.entries()) {
-			buf.append("\tkey: ").append(entry.getKey()).append(" -> ").append(entry.getValue().toString()).append("\n");
-		}
-		return buf.toString();
-	}
-	
-	public static synchronized void initializeKryo(Kryo kryo) {
-		if (kryoInitialized)
-			return;
-		valueSerializer = new ArraySerializer(kryo);
-		valueSerializer.setDimensionCount(1);
-		valueSerializer.setElementsAreSameType(true);
-		valueSerializer.setCanBeNull(false);
-		valueSerializer.setElementsCanBeNull(false);
-		kryo.register(byte[].class, valueSerializer);
-		kryoInitialized = true;
-	}
-	
-	public void readObjectData(Kryo kryo, ByteBuffer buf) {
-		if (!kryoInitialized)
-			EventFields.initializeKryo(kryo);
-		//Read in the number of map entries
-		int entries = IntSerializer.get(buf, true);
-		for (int i = 0; i < entries; i++) {
-			//Read in the key
-			String key = StringSerializer.get(buf);
-			//Read in the fields in the value
-			ColumnVisibility vis = new ColumnVisibility(valueSerializer.readObjectData(buf, byte[].class));
-			byte[] value = valueSerializer.readObjectData(buf, byte[].class);
-			map.put(key, new FieldValue(vis, value));
-		}
-		
-	}
-
-	public void writeObjectData(Kryo kryo, ByteBuffer buf) {
-		if (!kryoInitialized)
-			EventFields.initializeKryo(kryo);
-		//Write out the number of entries;
-		IntSerializer.put(buf, map.size(), true);
-		for (Entry<String,FieldValue> entry : map.entries()) {
-			//Write the key
-			StringSerializer.put(buf, entry.getKey());
-			//Write the fields in the value
-			valueSerializer.writeObjectData(buf, entry.getValue().getVisibility().flatten());
-			valueSerializer.writeObjectData(buf, entry.getValue().getValue());
-		}
-	}
-
-	
+public class EventFields implements SetMultimap<String,FieldValue>, CustomSerialization {
+  
+  private static boolean kryoInitialized = false;
+  private static ArraySerializer valueSerializer = null;
+  
+  private Multimap<String,FieldValue> map = null;
+  
+  public static class FieldValue {
+    ColumnVisibility visibility;
+    byte[] value;
+    
+    public FieldValue(ColumnVisibility visibility, byte[] value) {
+      super();
+      this.visibility = visibility;
+      this.value = value;
+    }
+    
+    public ColumnVisibility getVisibility() {
+      return visibility;
+    }
+    
+    public byte[] getValue() {
+      return value;
+    }
+    
+    public void setVisibility(ColumnVisibility visibility) {
+      this.visibility = visibility;
+    }
+    
+    public void setValue(byte[] value) {
+      this.value = value;
+    }
+    
+    public int size() {
+      return visibility.flatten().length + value.length;
+    }
+    
+    @Override
+    public String toString() {
+      StringBuilder buf = new StringBuilder();
+      if (null != visibility)
+        buf.append(" visibility: ").append(new String(visibility.flatten()));
+      if (null != value)
+        buf.append(" value size: ").append(value.length);
+      if (null != value)
+        buf.append(" value: ").append(new String(value));
+      return buf.toString();
+    }
+    
+  }
+  
+  public EventFields() {
+    map = HashMultimap.create();
+  }
+  
+  public int size() {
+    return map.size();
+  }
+  
+  public boolean isEmpty() {
+    return map.isEmpty();
+  }
+  
+  public boolean containsKey(Object key) {
+    return map.containsKey(key);
+  }
+  
+  public boolean containsValue(Object value) {
+    return map.containsValue(value);
+  }
+  
+  public boolean containsEntry(Object key, Object value) {
+    return map.containsEntry(key, value);
+  }
+  
+  public boolean put(String key, FieldValue value) {
+    return map.put(key, value);
+  }
+  
+  public boolean remove(Object key, Object value) {
+    return map.remove(key, value);
+  }
+  
+  public boolean putAll(String key, Iterable<? extends FieldValue> values) {
+    return map.putAll(key, values);
+  }
+  
+  public boolean putAll(Multimap<? extends String,? extends FieldValue> multimap) {
+    return map.putAll(multimap);
+  }
+  
+  public void clear() {
+    map.clear();
+  }
+  
+  public Set<String> keySet() {
+    return map.keySet();
+  }
+  
+  public Multiset<String> keys() {
+    return map.keys();
+  }
+  
+  public Collection<FieldValue> values() {
+    return map.values();
+  }
+  
+  public Set<FieldValue> get(String key) {
+    return (Set<FieldValue>) map.get(key);
+  }
+  
+  public Set<FieldValue> removeAll(Object key) {
+    return (Set<FieldValue>) map.removeAll(key);
+  }
+  
+  public Set<FieldValue> replaceValues(String key, Iterable<? extends FieldValue> values) {
+    return (Set<FieldValue>) map.replaceValues(key, values);
+  }
+  
+  public Set<Entry<String,FieldValue>> entries() {
+    return (Set<Entry<String,FieldValue>>) map.entries();
+  }
+  
+  public Map<String,Collection<FieldValue>> asMap() {
+    return map.asMap();
+  }
+  
+  public int getByteSize() {
+    int count = 0;
+    for (Entry<String,FieldValue> e : map.entries()) {
+      count += e.getKey().getBytes().length + e.getValue().size();
+    }
+    return count;
+  }
+  
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    for (Entry<String,FieldValue> entry : map.entries()) {
+      buf.append("\tkey: ").append(entry.getKey()).append(" -> ").append(entry.getValue().toString()).append("\n");
+    }
+    return buf.toString();
+  }
+  
+  public static synchronized void initializeKryo(Kryo kryo) {
+    if (kryoInitialized)
+      return;
+    valueSerializer = new ArraySerializer(kryo);
+    valueSerializer.setDimensionCount(1);
+    valueSerializer.setElementsAreSameType(true);
+    valueSerializer.setCanBeNull(false);
+    valueSerializer.setElementsCanBeNull(false);
+    kryo.register(byte[].class, valueSerializer);
+    kryoInitialized = true;
+  }
+  
+  public void readObjectData(Kryo kryo, ByteBuffer buf) {
+    if (!kryoInitialized)
+      EventFields.initializeKryo(kryo);
+    // Read in the number of map entries
+    int entries = IntSerializer.get(buf, true);
+    for (int i = 0; i < entries; i++) {
+      // Read in the key
+      String key = StringSerializer.get(buf);
+      // Read in the fields in the value
+      ColumnVisibility vis = new ColumnVisibility(valueSerializer.readObjectData(buf, byte[].class));
+      byte[] value = valueSerializer.readObjectData(buf, byte[].class);
+      map.put(key, new FieldValue(vis, value));
+    }
+    
+  }
+  
+  public void writeObjectData(Kryo kryo, ByteBuffer buf) {
+    if (!kryoInitialized)
+      EventFields.initializeKryo(kryo);
+    // Write out the number of entries;
+    IntSerializer.put(buf, map.size(), true);
+    for (Entry<String,FieldValue> entry : map.entries()) {
+      // Write the key
+      StringSerializer.put(buf, entry.getKey());
+      // Write the fields in the value
+      valueSerializer.writeObjectData(buf, entry.getValue().getVisibility().flatten());
+      valueSerializer.writeObjectData(buf, entry.getValue().getValue());
+    }
+  }
+  
 }