You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/10/19 01:52:07 UTC
[6/6] git commit: ACCUMULO-1783 Building a "better" typed
AccumuloStorage.
ACCUMULO-1783 Building a "better" typed AccumuloStorage.
Took some hints from the AccumuloStorage and HBaseStorage classes on how
to generalize it. (Should) provide serialization of any type
into/out-of accumulo
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/c25f26c7
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/c25f26c7
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/c25f26c7
Branch: refs/heads/1.4
Commit: c25f26c7a1ebf38c80481d4ccc8b19603ca634dd
Parents: d75f91c
Author: Josh Elser <el...@apache.org>
Authored: Fri Oct 18 19:50:09 2013 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Oct 18 19:50:09 2013 -0400
----------------------------------------------------------------------
.../accumulo/pig/TypedAccumuloStorage.java | 207 +++++++++++++++++++
1 file changed, 207 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/c25f26c7/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java
new file mode 100644
index 0000000..30c39c9
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.pig;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.pig.LoadStoreCaster;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.LoadPushDown.RequiredFieldList;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+import org.joda.time.DateTime;
+
+/**
+ * A LoadStoreFunc for retrieving data from and storing data to Accumulo
+ *
+ * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a
+ * long.
+ *
+ * Tuples can be written in 2 forms: (key, colfam, colqual, colvis, value) OR (key, colfam, colqual, value)
+ *
+ */
+public class TypedAccumuloStorage extends AbstractAccumuloStorage {
+ private static final Log LOG = LogFactory.getLog(TypedAccumuloStorage.class);
+ protected LoadStoreCaster caster;
+ protected String contextSignature = null;
+
+ private ResourceSchema schema_;
+ private RequiredFieldList requiredFieldList;
+
+ public TypedAccumuloStorage() {
+ this.caster = new Utf8StorageConverter();
+ }
+
+ @Override
+ protected Tuple getTuple(Key key, Value value) throws IOException {
+ // and wrap it in a tuple
+ Tuple tuple = TupleFactory.getInstance().newTuple(6);
+ tuple.set(0, new DataByteArray(key.getRow().getBytes()));
+ tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
+ tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
+ tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes()));
+ tuple.set(4, new Long(key.getTimestamp()));
+ tuple.set(5, new DataByteArray(value.get()));
+ return tuple;
+ }
+
+ @Override
+ public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
+ ResourceFieldSchema[] fieldSchemas = (schema_ == null) ? null : schema_.getFields();
+
+ Text t = tupleToText(tuple, 0, fieldSchemas);
+
+ Mutation mut = new Mutation(t);
+ Text cf = tupleToText(tuple, 1, fieldSchemas);
+ Text cq = tupleToText(tuple, 2, fieldSchemas);
+
+ if (tuple.size() > 4) {
+ Text cv = tupleToText(tuple, 3, fieldSchemas);
+
+ byte[] valueBytes = tupleToBytes(tuple, 4, fieldSchemas);
+
+ Value val = new Value(valueBytes);
+ if (cv.getLength() == 0) {
+ mut.put(cf, cq, val);
+ } else {
+ mut.put(cf, cq, new ColumnVisibility(cv), val);
+ }
+ } else {
+ byte[] valueBytes = tupleToBytes(tuple, 3, fieldSchemas);
+ Value val = new Value(valueBytes);
+ mut.put(cf, cq, val);
+ }
+
+ return Collections.singleton(mut);
+ }
+
+ @Override
+ public void setUDFContextSignature(String signature) {
+ this.contextSignature = signature;
+ }
+
+ @Override
+ public void setStoreFuncUDFContextSignature(String signature) {
+ this.contextSignature = signature;
+ }
+
+ /**
+ * Returns UDFProperties based on <code>contextSignature</code>.
+ */
+ private Properties getUDFProperties() {
+ return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] {contextSignature});
+ }
+
+ @Override
+ public void checkSchema(ResourceSchema s) throws IOException {
+ if (!(caster instanceof LoadStoreCaster)) {
+ LOG.error("Caster must implement LoadStoreCaster for writing to HBase.");
+ throw new IOException("Bad Caster " + caster.getClass());
+ }
+ schema_ = s;
+ getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema_));
+ }
+
+ private Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
+ Object o = tuple.get(i);
+ byte type = schemaToType(o, i, fieldSchemas);
+
+ return objToText(o, type);
+ }
+
+ private byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) {
+ return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
+ }
+
+ private byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
+ Object o = tuple.get(i);
+ byte type = schemaToType(o, i, fieldSchemas);
+
+ return objToBytes(o, type);
+
+ }
+
+ private Text objToText(Object o, byte type) throws IOException {
+ return new Text(objToBytes(o, type));
+ }
+
+ @SuppressWarnings("unchecked")
+ private byte[] objToBytes(Object o, byte type) throws IOException {
+ if (o == null)
+ return null;
+ switch (type) {
+ case DataType.BYTEARRAY:
+ return ((DataByteArray) o).get();
+ case DataType.BAG:
+ return caster.toBytes((DataBag) o);
+ case DataType.CHARARRAY:
+ return caster.toBytes((String) o);
+ case DataType.DOUBLE:
+ return caster.toBytes((Double) o);
+ case DataType.FLOAT:
+ return caster.toBytes((Float) o);
+ case DataType.INTEGER:
+ return caster.toBytes((Integer) o);
+ case DataType.LONG:
+ return caster.toBytes((Long) o);
+ case DataType.BIGINTEGER:
+ return caster.toBytes((BigInteger) o);
+ case DataType.BIGDECIMAL:
+ return caster.toBytes((BigDecimal) o);
+ case DataType.BOOLEAN:
+ return caster.toBytes((Boolean) o);
+ case DataType.DATETIME:
+ return caster.toBytes((DateTime) o);
+
+ // The type conversion here is unchecked.
+ // Relying on DataType.findType to do the right thing.
+ case DataType.MAP:
+ return caster.toBytes((Map<String,Object>) o);
+
+ case DataType.NULL:
+ return null;
+ case DataType.TUPLE:
+ return caster.toBytes((Tuple) o);
+ case DataType.ERROR:
+ throw new IOException("Unable to determine type of " + o.getClass());
+ default:
+ throw new IOException("Unable to find a converter for tuple field " + o);
+ }
+ }
+
+}