You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/10/31 04:25:14 UTC
[02/10] git commit: ACCUMULO-1783 Add in the implementation for
getMutations for AccumuloStorage.
ACCUMULO-1783 Add in the implementation for getMutations for
AccumuloStorage.
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/74c01ec2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/74c01ec2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/74c01ec2
Branch: refs/heads/ACCUMULO-1783
Commit: 74c01ec215ee7c50fd75c16d606fc8c073f25c8f
Parents: 294f9ce
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 24 12:19:29 2013 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 24 12:19:29 2013 -0700
----------------------------------------------------------------------
pom.xml | 5 +
.../accumulo/pig/AbstractAccumuloStorage.java | 186 +++++++++++++++++--
.../apache/accumulo/pig/AccumuloKVStorage.java | 150 ---------------
.../apache/accumulo/pig/AccumuloStorage.java | 134 +++++++++++++
4 files changed, 312 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/74c01ec2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 249dcce..630d5e2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,11 @@
<artifactId>joda-time</artifactId>
<version>1.6</version>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>15.0</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/74c01ec2/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 0424b8a..494fd72 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -17,10 +17,13 @@
package org.apache.accumulo.pig;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
@@ -31,6 +34,7 @@ import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.Pair;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -42,12 +46,19 @@ import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadStoreCaster;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
+import org.joda.time.DateTime;
/**
* A LoadStoreFunc for retrieving data from and storing data to Accumulo
@@ -61,6 +72,8 @@ import org.apache.pig.impl.util.UDFContext;
public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreFuncInterface {
private static final Log LOG = LogFactory.getLog(AbstractAccumuloStorage.class);
+ private static final String COLON = ":", COMMA = ",";
+
private Configuration conf;
private RecordReader<Key,Value> reader;
private RecordWriter<Text,Mutation> writer;
@@ -81,7 +94,9 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
int maxWriteThreads = 10;
long maxMutationBufferSize = 10 * 1000 * 1000;
int maxLatency = 10 * 1000;
-
+
+ protected LoadStoreCaster caster;
+ protected ResourceSchema schema;
protected String contextSignature = null;
public AbstractAccumuloStorage() {}
@@ -118,7 +133,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
private void setLocationFromUri(String location) throws IOException {
// ex:
- // accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2&start=abc&end=z
+ // accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&fetch_columns=col1:cq1,col2:cq2&start=abc&end=z
String columns = "";
try {
if (!location.startsWith("accumulo://"))
@@ -137,7 +152,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
zookeepers = pair[1];
else if (pair[0].equals("auths"))
auths = pair[1];
- else if (pair[0].equals("columns"))
+ else if (pair[0].equals("fetch_columns"))
columns = pair[1];
else if (pair[0].equals("start"))
start = pair[1];
@@ -158,13 +173,13 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
if (auths == null || auths.equals("")) {
authorizations = new Authorizations();
} else {
- authorizations = new Authorizations(auths.split(","));
+ authorizations = new Authorizations(auths.split(COMMA));
}
- if (!columns.equals("")) {
- for (String cfCq : columns.split(",")) {
- if (cfCq.contains("|")) {
- String[] c = cfCq.split("\\|");
+ if (!StringUtils.isEmpty(columns)) {
+ for (String cfCq : columns.split(COMMA)) {
+ if (cfCq.contains(COLON)) {
+ String[] c = cfCq.split(COLON);
columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text(c[0]), new Text(c[1])));
} else {
columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text(cfCq), null));
@@ -175,7 +190,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
} catch (Exception e) {
throw new IOException(
"Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&"
- + "[start=startRow,end=endRow,columns=[cf1|cq1,cf2|cq2,...],write_buffer_size_bytes=10000000,write_threads=10,write_latency_ms=30000]]': "
+ + "[start=startRow,end=endRow,fetch_columns=[cf1:cq1,cf2:cq2,...],write_buffer_size_bytes=10000000,write_threads=10,write_latency_ms=30000]]': "
+ e.getMessage());
}
}
@@ -255,10 +270,6 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
return new AccumuloOutputFormat();
}
- public void checkSchema(ResourceSchema schema) throws IOException {
- // we don't care about types, they all get casted to ByteBuffers
- }
-
@SuppressWarnings({"rawtypes", "unchecked"})
public void prepareToWrite(RecordWriter writer) {
this.writer = writer;
@@ -280,4 +291,153 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
public void cleanupOnFailure(String failure, Job job) {}
public void cleanupOnSuccess(String location, Job job) {}
+
+ @Override
+ public void checkSchema(ResourceSchema s) throws IOException {
+ if (!(caster instanceof LoadStoreCaster)) {
+ LOG.error("Caster must implement LoadStoreCaster for writing to Accumulo.");
+ throw new IOException("Bad Caster " + caster.getClass());
+ }
+ schema = s;
+ getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema));
+ }
+
+
+ protected Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
+ Object o = tuple.get(i);
+ byte type = schemaToType(o, i, fieldSchemas);
+
+ return objToText(o, type);
+ }
+
+ protected Text objectToText(Object o, ResourceFieldSchema fieldSchema) throws IOException {
+ byte type = schemaToType(o, fieldSchema);
+
+ return objToText(o, type);
+ }
+
+ protected byte schemaToType(Object o, ResourceFieldSchema fieldSchema) {
+ return (fieldSchema == null) ? DataType.findType(o) : fieldSchema.getType();
+ }
+
+ protected byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) {
+ return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
+ }
+
+ protected byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
+ Object o = tuple.get(i);
+ byte type = schemaToType(o, i, fieldSchemas);
+
+ return objToBytes(o, type);
+
+ }
+
+ protected long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
+ Object o = tuple.get(i);
+ byte type = schemaToType(o, i, fieldSchemas);
+
+ switch (type) {
+ case DataType.LONG:
+ return (Long) o;
+ case DataType.CHARARRAY:
+ String timestampString = (String) o;
+ try {
+ return Long.parseLong(timestampString);
+ } catch (NumberFormatException e) {
+ final String msg = "Could not cast chararray into long: " + timestampString;
+ LOG.error(msg);
+ throw new IOException(msg, e);
+ }
+ case DataType.DOUBLE:
+ Double doubleTimestamp = (Double) o;
+ return doubleTimestamp.longValue();
+ case DataType.FLOAT:
+ Float floatTimestamp = (Float) o;
+ return floatTimestamp.longValue();
+ case DataType.INTEGER:
+ Integer intTimestamp = (Integer) o;
+ return intTimestamp.longValue();
+ case DataType.BIGINTEGER:
+ BigInteger bigintTimestamp = (BigInteger) o;
+ long longTimestamp = bigintTimestamp.longValue();
+
+ BigInteger recreatedTimestamp = BigInteger.valueOf(longTimestamp);
+
+ if (!recreatedTimestamp.equals(bigintTimestamp)) {
+ LOG.warn("Downcasting BigInteger into Long results in a change of the original value. Was " + bigintTimestamp + " but is now " + longTimestamp);
+ }
+
+ return longTimestamp;
+ case DataType.BIGDECIMAL:
+ BigDecimal bigdecimalTimestamp = (BigDecimal) o;
+ try {
+ return bigdecimalTimestamp.longValueExact();
+ } catch (ArithmeticException e) {
+ long convertedLong = bigdecimalTimestamp.longValue();
+ LOG.warn("Downcasting BigDecimal into Long results in a loss of information. Was " + bigdecimalTimestamp + " but is now " + convertedLong);
+ return convertedLong;
+ }
+ case DataType.BYTEARRAY:
+ DataByteArray bytes = (DataByteArray) o;
+ try {
+ return Long.parseLong(bytes.toString());
+ } catch (NumberFormatException e) {
+ final String msg = "Could not cast bytes into long: " + bytes.toString();
+ LOG.error(msg);
+ throw new IOException(msg, e);
+ }
+ default:
+ LOG.error("Could not convert " + o + " of class " + o.getClass() + " into long.");
+ throw new IOException("Could not convert " + o.getClass() + " into long");
+
+ }
+ }
+
+ protected Text objToText(Object o, byte type) throws IOException {
+ return new Text(objToBytes(o, type));
+ }
+
+ @SuppressWarnings("unchecked")
+ protected byte[] objToBytes(Object o, byte type) throws IOException {
+ if (o == null)
+ return null;
+ switch (type) {
+ case DataType.BYTEARRAY:
+ return ((DataByteArray) o).get();
+ case DataType.BAG:
+ return caster.toBytes((DataBag) o);
+ case DataType.CHARARRAY:
+ return caster.toBytes((String) o);
+ case DataType.DOUBLE:
+ return caster.toBytes((Double) o);
+ case DataType.FLOAT:
+ return caster.toBytes((Float) o);
+ case DataType.INTEGER:
+ return caster.toBytes((Integer) o);
+ case DataType.LONG:
+ return caster.toBytes((Long) o);
+ case DataType.BIGINTEGER:
+ return caster.toBytes((BigInteger) o);
+ case DataType.BIGDECIMAL:
+ return caster.toBytes((BigDecimal) o);
+ case DataType.BOOLEAN:
+ return caster.toBytes((Boolean) o);
+ case DataType.DATETIME:
+ return caster.toBytes((DateTime) o);
+
+ // The type conversion here is unchecked.
+ // Relying on DataType.findType to do the right thing.
+ case DataType.MAP:
+ return caster.toBytes((Map<String,Object>) o);
+
+ case DataType.NULL:
+ return null;
+ case DataType.TUPLE:
+ return caster.toBytes((Tuple) o);
+ case DataType.ERROR:
+ throw new IOException("Unable to determine type of " + o.getClass());
+ default:
+ throw new IOException("Unable to find a converter for tuple field " + o);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/74c01ec2/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
index 8a17e8b..8462985 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
@@ -17,11 +17,8 @@
package org.apache.accumulo.pig;
import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
import java.util.Collection;
import java.util.Collections;
-import java.util.Map;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -30,18 +27,12 @@ import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
-import org.apache.pig.LoadStoreCaster;
-import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.Utf8StorageConverter;
-import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.util.ObjectSerializer;
-import org.joda.time.DateTime;
/**
* A LoadStoreFunc for retrieving data from and storing data to Accumulo.
@@ -60,9 +51,6 @@ import org.joda.time.DateTime;
*/
public class AccumuloKVStorage extends AbstractAccumuloStorage {
private static final Log LOG = LogFactory.getLog(AccumuloKVStorage.class);
- protected LoadStoreCaster caster;
-
- private ResourceSchema schema;
public AccumuloKVStorage() {
this.caster = new Utf8StorageConverter();
@@ -129,142 +117,4 @@ public class AccumuloKVStorage extends AbstractAccumuloStorage {
return Collections.singleton(mut);
}
- @Override
- public void checkSchema(ResourceSchema s) throws IOException {
- if (!(caster instanceof LoadStoreCaster)) {
- LOG.error("Caster must implement LoadStoreCaster for writing to Accumulo.");
- throw new IOException("Bad Caster " + caster.getClass());
- }
- schema = s;
- getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema));
- }
-
- private Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
- Object o = tuple.get(i);
- byte type = schemaToType(o, i, fieldSchemas);
-
- return objToText(o, type);
- }
-
- private byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) {
- return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
- }
-
- private byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
- Object o = tuple.get(i);
- byte type = schemaToType(o, i, fieldSchemas);
-
- return objToBytes(o, type);
-
- }
-
- private long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
- Object o = tuple.get(i);
- byte type = schemaToType(o, i, fieldSchemas);
-
- switch (type) {
- case DataType.LONG:
- return (Long) o;
- case DataType.CHARARRAY:
- String timestampString = (String) o;
- try {
- return Long.parseLong(timestampString);
- } catch (NumberFormatException e) {
- final String msg = "Could not cast chararray into long: " + timestampString;
- LOG.error(msg);
- throw new IOException(msg, e);
- }
- case DataType.DOUBLE:
- Double doubleTimestamp = (Double) o;
- return doubleTimestamp.longValue();
- case DataType.FLOAT:
- Float floatTimestamp = (Float) o;
- return floatTimestamp.longValue();
- case DataType.INTEGER:
- Integer intTimestamp = (Integer) o;
- return intTimestamp.longValue();
- case DataType.BIGINTEGER:
- BigInteger bigintTimestamp = (BigInteger) o;
- long longTimestamp = bigintTimestamp.longValue();
-
- BigInteger recreatedTimestamp = BigInteger.valueOf(longTimestamp);
-
- if (!recreatedTimestamp.equals(bigintTimestamp)) {
- LOG.warn("Downcasting BigInteger into Long results in a change of the original value. Was " + bigintTimestamp + " but is now " + longTimestamp);
- }
-
- return longTimestamp;
- case DataType.BIGDECIMAL:
- BigDecimal bigdecimalTimestamp = (BigDecimal) o;
- try {
- return bigdecimalTimestamp.longValueExact();
- } catch (ArithmeticException e) {
- long convertedLong = bigdecimalTimestamp.longValue();
- LOG.warn("Downcasting BigDecimal into Long results in a loss of information. Was " + bigdecimalTimestamp + " but is now " + convertedLong);
- return convertedLong;
- }
- case DataType.BYTEARRAY:
- DataByteArray bytes = (DataByteArray) o;
- try {
- return Long.parseLong(bytes.toString());
- } catch (NumberFormatException e) {
- final String msg = "Could not cast bytes into long: " + bytes.toString();
- LOG.error(msg);
- throw new IOException(msg, e);
- }
- default:
- LOG.error("Could not convert " + o + " of class " + o.getClass() + " into long.");
- throw new IOException("Could not convert " + o.getClass() + " into long");
-
- }
- }
-
- private Text objToText(Object o, byte type) throws IOException {
- return new Text(objToBytes(o, type));
- }
-
- @SuppressWarnings("unchecked")
- private byte[] objToBytes(Object o, byte type) throws IOException {
- if (o == null)
- return null;
- switch (type) {
- case DataType.BYTEARRAY:
- return ((DataByteArray) o).get();
- case DataType.BAG:
- return caster.toBytes((DataBag) o);
- case DataType.CHARARRAY:
- return caster.toBytes((String) o);
- case DataType.DOUBLE:
- return caster.toBytes((Double) o);
- case DataType.FLOAT:
- return caster.toBytes((Float) o);
- case DataType.INTEGER:
- return caster.toBytes((Integer) o);
- case DataType.LONG:
- return caster.toBytes((Long) o);
- case DataType.BIGINTEGER:
- return caster.toBytes((BigInteger) o);
- case DataType.BIGDECIMAL:
- return caster.toBytes((BigDecimal) o);
- case DataType.BOOLEAN:
- return caster.toBytes((Boolean) o);
- case DataType.DATETIME:
- return caster.toBytes((DateTime) o);
-
- // The type conversion here is unchecked.
- // Relying on DataType.findType to do the right thing.
- case DataType.MAP:
- return caster.toBytes((Map<String,Object>) o);
-
- case DataType.NULL:
- return null;
- case DataType.TUPLE:
- return caster.toBytes((Tuple) o);
- case DataType.ERROR:
- throw new IOException("Unable to determine type of " + o.getClass());
- default:
- throw new IOException("Unable to find a converter for tuple field " + o);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/74c01ec2/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
new file mode 100644
index 0000000..c72f07f
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -0,0 +1,134 @@
+package org.apache.accumulo.pig;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+import com.google.common.collect.Lists;
+
+public class AccumuloStorage extends AbstractAccumuloStorage {
+ private static final Logger log = Logger.getLogger(AccumuloStorage.class);
+ private static final String COMMA = ",", COLON = ":";
+ private static final Text EMPTY_TEXT = new Text(new byte[0]);
+
+ protected final List<String> columnSpecs;
+
+ public AccumuloStorage(String columns) {
+ if (!StringUtils.isBlank(columns)) {
+ String[] columnArray = StringUtils.split(columns, COMMA);
+ columnSpecs = Lists.newArrayList(columnArray);
+ } else {
+ columnSpecs = Collections.emptyList();
+ }
+ }
+
+ @Override
+ protected Tuple getTuple(Key key, Value value) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
+ final ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields();
+
+ Iterator<Object> tupleIter = tuple.iterator();
+
+ if (1 <= tuple.size()) {
+ log.debug("Ignoring tuple of size " + tuple.size());
+ return Collections.emptyList();
+ }
+
+ Mutation mutation = new Mutation(objectToText(tupleIter.next(), (null == fieldSchemas) ? null : fieldSchemas[0]));
+
+ // TODO Can these be lifted up to members of the class instead of this method?
+ final Text _cfHolder = new Text(), _cqHolder = new Text();
+
+ int columnOffset = 0;
+ int tupleOffset = 1;
+ while (tupleIter.hasNext()) {
+ Object o = tupleIter.next();
+ String cf = null;
+
+ // Figure out if the user provided a specific columnfamily to use.
+ if (columnOffset < columnSpecs.size()) {
+ cf = columnSpecs.get(columnOffset);
+ }
+
+ // Grab the type for this field
+ byte type = schemaToType(o, fieldSchemas[tupleOffset]);
+
+ // If we have a Map, we want to treat every Entry as a column in this record
+ // placing said column in the column family unless this instance of AccumuloStorage
+ // was provided a specific columnFamily to use, in which case the entry's column is
+ // in the column qualifier.
+ if (DataType.MAP == type) {
+ @SuppressWarnings("unchecked")
+ Map<String,Object> map = (Map<String,Object>) o;
+
+ for (Entry<String,Object> entry : map.entrySet()) {
+ Object entryObject = entry.getValue();
+ byte entryType = DataType.findType(entryObject);
+
+ Value value = new Value(objToBytes(entryObject, entryType));
+
+ // If we have a CF, use it and push the Map's key down to the CQ
+ if (null != cf) {
+ _cfHolder.set(cf);
+ _cqHolder.set(entry.getKey());
+
+ mutation.put(_cfHolder, _cqHolder, value);
+ } else {
+ // Just put the Map's key into the CF
+ _cfHolder.set(entry.getKey());
+ mutation.put(_cfHolder, EMPTY_TEXT, value);
+ }
+ }
+ } else if (null == cf) {
+ // We don't know what column to place the value into
+ log.warn("Was provided no column family for non-Map entry in the tuple at offset " + tupleOffset);
+ } else {
+ Value value = new Value(objToBytes(o, type));
+
+ // We have something that isn't a Map, use the provided CF as a column name
+ // and then shove the value into the Value
+ int index = cf.indexOf(COLON);
+ if (-1 == index) {
+ _cfHolder.set(cf);
+
+ mutation.put(_cfHolder, EMPTY_TEXT, value);
+ } else {
+ byte[] cfBytes = cf.getBytes();
+ _cfHolder.set(cfBytes, 0, index);
+ _cqHolder.set(cfBytes, index+1, cfBytes.length - (index + 1));
+
+ mutation.put(_cfHolder, _cqHolder, value);
+ }
+ }
+
+ columnOffset++;
+ tupleOffset++;
+ }
+
+ if (0 == mutation.size()) {
+ return Collections.emptyList();
+ }
+
+ return Collections.singletonList(mutation);
+ }
+}