You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bi...@apache.org on 2011/12/05 21:05:51 UTC
svn commit: r1210600 [11/16] - in
/incubator/accumulo/trunk/contrib/accumulo_sample: ./ ingest/
ingest/src/main/java/aggregator/ ingest/src/main/java/ingest/
ingest/src/main/java/iterator/ ingest/src/main/java/normalizer/
ingest/src/main/java/protobuf/...
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/QueryLogic.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/QueryLogic.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/QueryLogic.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/QueryLogic.java Mon Dec 5 20:05:49 2011
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package logic;
import iterator.EvaluatingIterator;
@@ -51,155 +51,152 @@ import com.google.protobuf.InvalidProtoc
* QueryTable implementation that works with the JEXL grammar. This QueryTable
* uses the metadata, global index, and partitioned table to return
* results based on the query. Example queries:
- *
+ *
* <b>Single Term Query</b>
* 'foo' - looks in global index for foo, and if any entries are found, then the query
* is rewritten to be field1 == 'foo' or field2 == 'foo', etc. This is then passed
* down the optimized query path which uses the intersecting iterators on the shard
* table.
- *
+ *
* <b>Boolean expression</b>
* field == 'foo' - For fielded queries, those that contain a field, an operator, and a literal (string or number),
* the query is parsed and the set of eventFields in the query that are indexed is determined by
* querying the metadata table. Depending on the conjunctions in the query (or, and, not) and the
* eventFields that are indexed, the query may be sent down the optimized path or the full scan path.
- *
+ *
* We are not supporting all of the operators that JEXL supports at this time. We are supporting the following operators:
- *
+ *
* ==, !=, >, ≥, <, ≤, =~, and !~
- *
+ *
* Custom functions can be created and registered with the Jexl engine. The functions can be used in the queries in conjunction
* with other supported operators. A sample function has been created, called between, and is bound to the 'f' namespace. An
* example using this function is : "f:between(LATITUDE,60.0, 70.0)"
- *
+ *
* <h2>Constraints on Query Structure</h2>
* Queries that are sent to this class need to be formatted such that there is a space on either side of the operator. We are
* rewriting the query in some cases and the current implementation is expecting a space on either side of the operator. Users
* should also be aware that the literals used in the query need to match the data in the table. If an error occurs in the evaluation
* we are skipping the event.
- *
+ *
* <h2>Notes on Optimization</h2>
* Queries that meet any of the following criteria will perform a full scan of the events in the partitioned table:
- *
+ *
* 1. An 'or' conjunction exists in the query but not all of the terms are indexed.
* 2. No indexed terms exist in the query
* 3. An unsupported operator exists in the query
- *
+ *
* </pre>
- *
+ *
*/
public class QueryLogic extends AbstractQueryLogic {
-
- protected static Logger log = Logger.getLogger(QueryLogic.class);
-
- private static String startPartition = "0";
+
+ protected static Logger log = Logger.getLogger(QueryLogic.class);
+
+ private static String startPartition = "0";
+
+ public QueryLogic() {
+ super();
+ }
+
+ @Override
+ protected RangeCalculator getTermIndexInformation(Connector c, Authorizations auths, Multimap<String,Normalizer> indexedTerms,
+ Multimap<String,QueryTerm> terms, String indexTableName, String reverseIndexTableName, String queryString, int queryThreads, Set<String> typeFilter)
+ throws TableNotFoundException, org.apache.commons.jexl2.parser.ParseException {
+ RangeCalculator calc = new RangeCalculator();
+ calc.execute(c, auths, indexedTerms, terms, queryString, this, typeFilter);
+ return calc;
+ }
+
+ protected Collection<Range> getFullScanRange(Date begin, Date end, Multimap<String,QueryTerm> terms) {
+ String startKey = startPartition;
+ String endKey = Integer.toString(this.getNumPartitions());
+ Range r = new Range(startKey, true, endKey, false);
+ return Collections.singletonList(r);
+ }
+
+ @Override
+ protected IndexRanges getTermIndexInformation(Connector c, Authorizations auths, String value, Set<String> typeFilter) throws TableNotFoundException {
+ final String dummyTermName = "DUMMY";
+ UnionIndexRanges indexRanges = new UnionIndexRanges();
- public QueryLogic() {
- super();
+ // The entries in the index are normalized, since we don't have a field, just try using the LcNoDiacriticsNormalizer.
+ String normalizedFieldValue = new LcNoDiacriticsNormalizer().normalizeFieldValue("", value);
+ // Remove the begin and end ' marks
+ if (normalizedFieldValue.startsWith("'") && normalizedFieldValue.endsWith("'")) {
+ normalizedFieldValue = normalizedFieldValue.substring(1, normalizedFieldValue.length() - 1);
}
-
- @Override
- protected RangeCalculator getTermIndexInformation(Connector c,
- Authorizations auths, Multimap<String, Normalizer> indexedTerms, Multimap<String, QueryTerm> terms,
- String indexTableName, String reverseIndexTableName, String queryString, int queryThreads, Set<String> typeFilter)
- throws TableNotFoundException, org.apache.commons.jexl2.parser.ParseException {
- RangeCalculator calc = new RangeCalculator();
- calc.execute(c, auths, indexedTerms, terms, queryString, this, typeFilter);
- return calc;
- }
-
-
- protected Collection<Range> getFullScanRange(Date begin, Date end, Multimap<String, QueryTerm> terms) {
- String startKey = startPartition;
- String endKey = Integer.toString(this.getNumPartitions());
- Range r = new Range(startKey, true, endKey, false);
- return Collections.singletonList(r);
- }
-
-
- @Override
- protected IndexRanges getTermIndexInformation(Connector c, Authorizations auths, String value, Set<String> typeFilter) throws TableNotFoundException {
- final String dummyTermName = "DUMMY";
- UnionIndexRanges indexRanges = new UnionIndexRanges();
-
- //The entries in the index are normalized, since we don't have a field, just try using the LcNoDiacriticsNormalizer.
- String normalizedFieldValue = new LcNoDiacriticsNormalizer().normalizeFieldValue("", value);
- //Remove the begin and end ' marks
- if (normalizedFieldValue.startsWith("'") && normalizedFieldValue.endsWith("'")) {
- normalizedFieldValue = normalizedFieldValue.substring(1, normalizedFieldValue.length() - 1);
- }
- Text fieldValue = new Text(normalizedFieldValue);
- if (log.isDebugEnabled()) {
- log.debug("Querying index table : " + this.getIndexTableName() + " for normalized indexed term: " + fieldValue);
- }
- Scanner scanner = c.createScanner(this.getIndexTableName(), auths);
- Range r = new Range(fieldValue);
- scanner.setRange(r);
- if (log.isDebugEnabled()) {
- log.debug("Range for index query: " + r.toString());
- }
- for (Entry<Key, Value> entry : scanner) {
- if (log.isDebugEnabled()) {
- log.debug("Index entry: " + entry.getKey().toString());
- }
- //Get the shard id and datatype from the colq
- String fieldName = entry.getKey().getColumnFamily().toString();
- String colq = entry.getKey().getColumnQualifier().toString();
- int separator = colq.indexOf(EvaluatingIterator.NULL_BYTE_STRING);
- String shardId = null;
- String datatype = null;
- if (separator != -1) {
- shardId = colq.substring(0, separator);
- datatype = colq.substring(separator + 1);
- } else {
- shardId = colq;
- }
- //Skip this entry if the type is not correct
- if (null != datatype && null != typeFilter && !typeFilter.contains(datatype))
- continue;
- //Parse the UID.List object from the value
- Uid.List uidList = null;
- try {
- uidList = Uid.List.parseFrom(entry.getValue().get());
- } catch (InvalidProtocolBufferException e) {
- //Don't add UID information, at least we know what shards
- //it is located in.
- }
-
- //Add the count for this shard to the total count for the term.
- long count = 0;
- Long storedCount = indexRanges.getTermCardinality().get(dummyTermName);
- if (null == storedCount) {
- count = uidList.getCOUNT();
- } else {
- count = uidList.getCOUNT() + storedCount;
- }
- indexRanges.getTermCardinality().put(dummyTermName, count);
- //Add the field name
- indexRanges.getFieldNamesAndValues().put(fieldName, normalizedFieldValue);
-
- //Create the keys
- Text shard = new Text(shardId);
- if (uidList.getIGNORE()) {
- //Then we create a scan range that is the entire shard
- indexRanges.add(dummyTermName, new Range(shard));
- } else {
- //We should have UUIDs, create event ranges
- for (String uuid : uidList.getUIDList()) {
- Text cf = new Text(datatype);
- TextUtil.textAppend(cf, uuid);
- Key startKey = new Key(shard, cf);
- Key endKey = new Key(shard, new Text(cf.toString() + EvaluatingIterator.NULL_BYTE_STRING));
- Range eventRange = new Range(startKey, true, endKey, false);
- indexRanges.add(dummyTermName, eventRange);
- }
- }
- }
- if (log.isDebugEnabled()) {
- log.debug("Found " + indexRanges.getRanges().size() + " entries in the index for field value: " + normalizedFieldValue);
+ Text fieldValue = new Text(normalizedFieldValue);
+ if (log.isDebugEnabled()) {
+ log.debug("Querying index table : " + this.getIndexTableName() + " for normalized indexed term: " + fieldValue);
+ }
+ Scanner scanner = c.createScanner(this.getIndexTableName(), auths);
+ Range r = new Range(fieldValue);
+ scanner.setRange(r);
+ if (log.isDebugEnabled()) {
+ log.debug("Range for index query: " + r.toString());
+ }
+ for (Entry<Key,Value> entry : scanner) {
+ if (log.isDebugEnabled()) {
+ log.debug("Index entry: " + entry.getKey().toString());
+ }
+ // Get the shard id and datatype from the colq
+ String fieldName = entry.getKey().getColumnFamily().toString();
+ String colq = entry.getKey().getColumnQualifier().toString();
+ int separator = colq.indexOf(EvaluatingIterator.NULL_BYTE_STRING);
+ String shardId = null;
+ String datatype = null;
+ if (separator != -1) {
+ shardId = colq.substring(0, separator);
+ datatype = colq.substring(separator + 1);
+ } else {
+ shardId = colq;
+ }
+ // Skip this entry if the type is not correct
+ if (null != datatype && null != typeFilter && !typeFilter.contains(datatype))
+ continue;
+ // Parse the UID.List object from the value
+ Uid.List uidList = null;
+ try {
+ uidList = Uid.List.parseFrom(entry.getValue().get());
+ } catch (InvalidProtocolBufferException e) {
+ // Don't add UID information, at least we know what shards
+ // it is located in.
+ }
+
+ // Add the count for this shard to the total count for the term.
+ long count = 0;
+ Long storedCount = indexRanges.getTermCardinality().get(dummyTermName);
+ if (null == storedCount) {
+ count = uidList.getCOUNT();
+ } else {
+ count = uidList.getCOUNT() + storedCount;
+ }
+ indexRanges.getTermCardinality().put(dummyTermName, count);
+ // Add the field name
+ indexRanges.getFieldNamesAndValues().put(fieldName, normalizedFieldValue);
+
+ // Create the keys
+ Text shard = new Text(shardId);
+ if (uidList.getIGNORE()) {
+ // Then we create a scan range that is the entire shard
+ indexRanges.add(dummyTermName, new Range(shard));
+ } else {
+ // We should have UUIDs, create event ranges
+ for (String uuid : uidList.getUIDList()) {
+ Text cf = new Text(datatype);
+ TextUtil.textAppend(cf, uuid);
+ Key startKey = new Key(shard, cf);
+ Key endKey = new Key(shard, new Text(cf.toString() + EvaluatingIterator.NULL_BYTE_STRING));
+ Range eventRange = new Range(startKey, true, endKey, false);
+ indexRanges.add(dummyTermName, eventRange);
}
- return indexRanges;
-
+ }
}
-
+ if (log.isDebugEnabled()) {
+ log.debug("Found " + indexRanges.getRanges().size() + " entries in the index for field value: " + normalizedFieldValue);
+ }
+ return indexRanges;
+
+ }
+
}
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/EventFields.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/EventFields.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/EventFields.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/EventFields.java Mon Dec 5 20:05:49 2011
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package parser;
import java.nio.ByteBuffer;
@@ -36,189 +36,192 @@ import com.google.common.collect.Multise
import com.google.common.collect.SetMultimap;
/**
- * Object used to hold the fields in an event. This is a multimap because fields can
- * be repeated.
+ * Object used to hold the fields in an event. This is a multimap because fields can be repeated.
*/
-public class EventFields implements SetMultimap<String, FieldValue>, CustomSerialization {
-
- private static boolean kryoInitialized = false;
- private static ArraySerializer valueSerializer = null;
-
- private Multimap<String,FieldValue> map = null;
-
- public static class FieldValue {
- ColumnVisibility visibility;
- byte[] value;
- public FieldValue(ColumnVisibility visibility, byte[] value) {
- super();
- this.visibility = visibility;
- this.value = value;
- }
- public ColumnVisibility getVisibility() {
- return visibility;
- }
- public byte[] getValue() {
- return value;
- }
- public void setVisibility(ColumnVisibility visibility) {
- this.visibility = visibility;
- }
- public void setValue(byte[] value) {
- this.value = value;
- }
-
- public int size() {
- return visibility.flatten().length + value.length;
- }
-
- @Override
- public String toString() {
- StringBuilder buf = new StringBuilder();
- if (null != visibility)
- buf.append(" visibility: ").append(new String(visibility.flatten()));
- if (null != value)
- buf.append(" value size: ").append(value.length);
- if (null != value)
- buf.append(" value: " ).append(new String(value));
- return buf.toString();
- }
-
- }
-
- public EventFields() {
- map = HashMultimap.create();
- }
-
- public int size() {
- return map.size();
- }
-
- public boolean isEmpty() {
- return map.isEmpty();
- }
-
- public boolean containsKey(Object key) {
- return map.containsKey(key);
- }
-
- public boolean containsValue(Object value) {
- return map.containsValue(value);
- }
-
- public boolean containsEntry(Object key, Object value) {
- return map.containsEntry(key, value);
- }
-
- public boolean put(String key, FieldValue value) {
- return map.put(key, value);
- }
-
- public boolean remove(Object key, Object value) {
- return map.remove(key, value);
- }
-
- public boolean putAll(String key, Iterable<? extends FieldValue> values) {
- return map.putAll(key, values);
- }
-
- public boolean putAll(Multimap<? extends String, ? extends FieldValue> multimap) {
- return map.putAll(multimap);
- }
-
- public void clear() {
- map.clear();
- }
-
- public Set<String> keySet() {
- return map.keySet();
- }
-
- public Multiset<String> keys() {
- return map.keys();
- }
-
- public Collection<FieldValue> values() {
- return map.values();
- }
-
- public Set<FieldValue> get(String key) {
- return (Set<FieldValue>) map.get(key);
- }
-
- public Set<FieldValue> removeAll(Object key) {
- return (Set<FieldValue>) map.removeAll(key);
- }
-
- public Set<FieldValue> replaceValues(String key, Iterable<? extends FieldValue> values) {
- return (Set<FieldValue>) map.replaceValues(key, values);
- }
-
- public Set<Entry<String, FieldValue>> entries() {
- return (Set<Entry<String,FieldValue>>) map.entries();
- }
-
- public Map<String, Collection<FieldValue>> asMap() {
- return map.asMap();
- }
-
- public int getByteSize() {
- int count = 0;
- for (Entry<String, FieldValue> e : map.entries()) {
- count += e.getKey().getBytes().length + e.getValue().size();
- }
- return count;
- }
-
- @Override
- public String toString() {
- StringBuilder buf = new StringBuilder();
- for (Entry<String,FieldValue> entry : map.entries()) {
- buf.append("\tkey: ").append(entry.getKey()).append(" -> ").append(entry.getValue().toString()).append("\n");
- }
- return buf.toString();
- }
-
- public static synchronized void initializeKryo(Kryo kryo) {
- if (kryoInitialized)
- return;
- valueSerializer = new ArraySerializer(kryo);
- valueSerializer.setDimensionCount(1);
- valueSerializer.setElementsAreSameType(true);
- valueSerializer.setCanBeNull(false);
- valueSerializer.setElementsCanBeNull(false);
- kryo.register(byte[].class, valueSerializer);
- kryoInitialized = true;
- }
-
- public void readObjectData(Kryo kryo, ByteBuffer buf) {
- if (!kryoInitialized)
- EventFields.initializeKryo(kryo);
- //Read in the number of map entries
- int entries = IntSerializer.get(buf, true);
- for (int i = 0; i < entries; i++) {
- //Read in the key
- String key = StringSerializer.get(buf);
- //Read in the fields in the value
- ColumnVisibility vis = new ColumnVisibility(valueSerializer.readObjectData(buf, byte[].class));
- byte[] value = valueSerializer.readObjectData(buf, byte[].class);
- map.put(key, new FieldValue(vis, value));
- }
-
- }
-
- public void writeObjectData(Kryo kryo, ByteBuffer buf) {
- if (!kryoInitialized)
- EventFields.initializeKryo(kryo);
- //Write out the number of entries;
- IntSerializer.put(buf, map.size(), true);
- for (Entry<String,FieldValue> entry : map.entries()) {
- //Write the key
- StringSerializer.put(buf, entry.getKey());
- //Write the fields in the value
- valueSerializer.writeObjectData(buf, entry.getValue().getVisibility().flatten());
- valueSerializer.writeObjectData(buf, entry.getValue().getValue());
- }
- }
-
-
+public class EventFields implements SetMultimap<String,FieldValue>, CustomSerialization {
+
+ private static boolean kryoInitialized = false;
+ private static ArraySerializer valueSerializer = null;
+
+ private Multimap<String,FieldValue> map = null;
+
+ public static class FieldValue {
+ ColumnVisibility visibility;
+ byte[] value;
+
+ public FieldValue(ColumnVisibility visibility, byte[] value) {
+ super();
+ this.visibility = visibility;
+ this.value = value;
+ }
+
+ public ColumnVisibility getVisibility() {
+ return visibility;
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+
+ public void setVisibility(ColumnVisibility visibility) {
+ this.visibility = visibility;
+ }
+
+ public void setValue(byte[] value) {
+ this.value = value;
+ }
+
+ public int size() {
+ return visibility.flatten().length + value.length;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ if (null != visibility)
+ buf.append(" visibility: ").append(new String(visibility.flatten()));
+ if (null != value)
+ buf.append(" value size: ").append(value.length);
+ if (null != value)
+ buf.append(" value: ").append(new String(value));
+ return buf.toString();
+ }
+
+ }
+
+ public EventFields() {
+ map = HashMultimap.create();
+ }
+
+ public int size() {
+ return map.size();
+ }
+
+ public boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ public boolean containsKey(Object key) {
+ return map.containsKey(key);
+ }
+
+ public boolean containsValue(Object value) {
+ return map.containsValue(value);
+ }
+
+ public boolean containsEntry(Object key, Object value) {
+ return map.containsEntry(key, value);
+ }
+
+ public boolean put(String key, FieldValue value) {
+ return map.put(key, value);
+ }
+
+ public boolean remove(Object key, Object value) {
+ return map.remove(key, value);
+ }
+
+ public boolean putAll(String key, Iterable<? extends FieldValue> values) {
+ return map.putAll(key, values);
+ }
+
+ public boolean putAll(Multimap<? extends String,? extends FieldValue> multimap) {
+ return map.putAll(multimap);
+ }
+
+ public void clear() {
+ map.clear();
+ }
+
+ public Set<String> keySet() {
+ return map.keySet();
+ }
+
+ public Multiset<String> keys() {
+ return map.keys();
+ }
+
+ public Collection<FieldValue> values() {
+ return map.values();
+ }
+
+ public Set<FieldValue> get(String key) {
+ return (Set<FieldValue>) map.get(key);
+ }
+
+ public Set<FieldValue> removeAll(Object key) {
+ return (Set<FieldValue>) map.removeAll(key);
+ }
+
+ public Set<FieldValue> replaceValues(String key, Iterable<? extends FieldValue> values) {
+ return (Set<FieldValue>) map.replaceValues(key, values);
+ }
+
+ public Set<Entry<String,FieldValue>> entries() {
+ return (Set<Entry<String,FieldValue>>) map.entries();
+ }
+
+ public Map<String,Collection<FieldValue>> asMap() {
+ return map.asMap();
+ }
+
+ public int getByteSize() {
+ int count = 0;
+ for (Entry<String,FieldValue> e : map.entries()) {
+ count += e.getKey().getBytes().length + e.getValue().size();
+ }
+ return count;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ for (Entry<String,FieldValue> entry : map.entries()) {
+ buf.append("\tkey: ").append(entry.getKey()).append(" -> ").append(entry.getValue().toString()).append("\n");
+ }
+ return buf.toString();
+ }
+
+ public static synchronized void initializeKryo(Kryo kryo) {
+ if (kryoInitialized)
+ return;
+ valueSerializer = new ArraySerializer(kryo);
+ valueSerializer.setDimensionCount(1);
+ valueSerializer.setElementsAreSameType(true);
+ valueSerializer.setCanBeNull(false);
+ valueSerializer.setElementsCanBeNull(false);
+ kryo.register(byte[].class, valueSerializer);
+ kryoInitialized = true;
+ }
+
+ public void readObjectData(Kryo kryo, ByteBuffer buf) {
+ if (!kryoInitialized)
+ EventFields.initializeKryo(kryo);
+ // Read in the number of map entries
+ int entries = IntSerializer.get(buf, true);
+ for (int i = 0; i < entries; i++) {
+ // Read in the key
+ String key = StringSerializer.get(buf);
+ // Read in the fields in the value
+ ColumnVisibility vis = new ColumnVisibility(valueSerializer.readObjectData(buf, byte[].class));
+ byte[] value = valueSerializer.readObjectData(buf, byte[].class);
+ map.put(key, new FieldValue(vis, value));
+ }
+
+ }
+
+ public void writeObjectData(Kryo kryo, ByteBuffer buf) {
+ if (!kryoInitialized)
+ EventFields.initializeKryo(kryo);
+ // Write out the number of entries;
+ IntSerializer.put(buf, map.size(), true);
+ for (Entry<String,FieldValue> entry : map.entries()) {
+ // Write the key
+ StringSerializer.put(buf, entry.getKey());
+ // Write the fields in the value
+ valueSerializer.writeObjectData(buf, entry.getValue().getVisibility().flatten());
+ valueSerializer.writeObjectData(buf, entry.getValue().getValue());
+ }
+ }
+
}