You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/10/31 04:25:13 UTC
[01/10] git commit: ACCUMULO-1783 Clean up the "typed" variant from
before and supersede the original AccumuloStorage.
Updated Branches:
refs/heads/ACCUMULO-1783 [created] 30fd9aa6c
ACCUMULO-1783 Clean up the "typed" variant from before and supersede the
original AccumuloStorage.
Going off of what HBaseStorage provides, the typical usecase is
treating each tuple as a "row" and inserting the multiple columns from a
bag or likewise as columns in that row. Trying to move towards this, I
renamed the AccumuloStorage into AccumuloKVStorage to make it obvious
that this storage engine is purely providing a Key/Value storage
mechanism and nothing more.
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/294f9ce8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/294f9ce8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/294f9ce8
Branch: refs/heads/ACCUMULO-1783
Commit: 294f9ce8498db614dac198aaf62d3023c7d9b02d
Parents: c25f26c
Author: Josh Elser <el...@apache.org>
Authored: Wed Oct 23 16:48:37 2013 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Wed Oct 23 16:48:37 2013 -0700
----------------------------------------------------------------------
.../accumulo/pig/AbstractAccumuloStorage.java | 13 +-
.../apache/accumulo/pig/AccumuloKVStorage.java | 270 +++++++++++++++++++
.../apache/accumulo/pig/AccumuloStorage.java | 83 ------
.../accumulo/pig/TypedAccumuloStorage.java | 207 --------------
.../accumulo/pig/AccumuloKVStorageTest.java | 134 +++++++++
.../accumulo/pig/AccumuloStorageTest.java | 104 -------
6 files changed, 416 insertions(+), 395 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index d26cf40..0424b8a 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Properties;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
@@ -46,6 +47,7 @@ import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
/**
* A LoadStoreFunc for retrieving data from and storing data to Accumulo
@@ -80,6 +82,8 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
long maxMutationBufferSize = 10 * 1000 * 1000;
int maxLatency = 10 * 1000;
+ protected String contextSignature = null;
+
public AbstractAccumuloStorage() {}
@Override
@@ -213,12 +217,19 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
@Override
public void setUDFContextSignature(String signature) {
-
+ this.contextSignature = signature;
}
/* StoreFunc methods */
public void setStoreFuncUDFContextSignature(String signature) {
+ this.contextSignature = signature;
+ }
+ /**
+ * Returns UDFProperties based on <code>contextSignature</code>.
+ */
+ protected Properties getUDFProperties() {
+ return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] {contextSignature});
}
public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
new file mode 100644
index 0000000..8a17e8b
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.pig;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.pig.LoadStoreCaster;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.joda.time.DateTime;
+
+/**
+ * A LoadStoreFunc for retrieving data from and storing data to Accumulo.
+ *
+ * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a
+ * long.
+ *
+ * <p>Tuples require at least key, column family, column qualifier and value; however column visibility or column visibility and timestamp may also be
+ * provided:</p>
+ *
+ * <ul>
+ * <li>(key, colfam, colqual, value)</li>
+ * <li>(key, colfam, colqual, colvis, value)</li>
+ * <li>(key, colfam, colqual, colvis, timestamp, value)</li>
+ * </ul>
+ */
+public class AccumuloKVStorage extends AbstractAccumuloStorage {
+ private static final Log LOG = LogFactory.getLog(AccumuloKVStorage.class);
+ protected LoadStoreCaster caster;
+
+ private ResourceSchema schema;
+
+ public AccumuloKVStorage() {
+ this.caster = new Utf8StorageConverter();
+ }
+
+ @Override
+ protected Tuple getTuple(Key key, Value value) throws IOException {
+ // and wrap it in a tuple
+ Tuple tuple = TupleFactory.getInstance().newTuple(6);
+ tuple.set(0, new DataByteArray(key.getRow().getBytes()));
+ tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
+ tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
+ tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes()));
+ tuple.set(4, new Long(key.getTimestamp()));
+ tuple.set(5, new DataByteArray(value.get()));
+ return tuple;
+ }
+
+ @Override
+ public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
+ ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields();
+
+ Text t = tupleToText(tuple, 0, fieldSchemas);
+
+ Mutation mut = new Mutation(t);
+ Text cf = tupleToText(tuple, 1, fieldSchemas);
+ Text cq = tupleToText(tuple, 2, fieldSchemas);
+
+ if (4 == tuple.size()) {
+ byte[] valueBytes = tupleToBytes(tuple, 3, fieldSchemas);
+ Value val = new Value(valueBytes);
+
+ mut.put(cf, cq, val);
+ } else if (5 == tuple.size()) {
+ Text cv = tupleToText(tuple, 3, fieldSchemas);
+
+ byte[] valueBytes = tupleToBytes(tuple, 4, fieldSchemas);
+
+ Value val = new Value(valueBytes);
+ if (cv.getLength() == 0) {
+ mut.put(cf, cq, val);
+ } else {
+ mut.put(cf, cq, new ColumnVisibility(cv), val);
+ }
+ } else {
+ if (6 < tuple.size()) {
+ LOG.debug("Ignoring additional entries in tuple of length " + tuple.size());
+ }
+
+ Text cv = tupleToText(tuple, 3, fieldSchemas);
+
+ long ts = objToLong(tuple, 4, fieldSchemas);
+
+ byte[] valueBytes = tupleToBytes(tuple, 5, fieldSchemas);
+
+ Value val = new Value(valueBytes);
+ if (cv.getLength() == 0) {
+ mut.put(cf, cq, val);
+ } else {
+ mut.put(cf, cq, new ColumnVisibility(cv), ts, val);
+ }
+ }
+
+ return Collections.singleton(mut);
+ }
+
+ @Override
+ public void checkSchema(ResourceSchema s) throws IOException {
+ if (!(caster instanceof LoadStoreCaster)) {
+ LOG.error("Caster must implement LoadStoreCaster for writing to Accumulo.");
+ throw new IOException("Bad Caster " + caster.getClass());
+ }
+ schema = s;
+ getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema));
+ }
+
+ private Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
+ Object o = tuple.get(i);
+ byte type = schemaToType(o, i, fieldSchemas);
+
+ return objToText(o, type);
+ }
+
+ private byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) {
+ return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
+ }
+
+ private byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
+ Object o = tuple.get(i);
+ byte type = schemaToType(o, i, fieldSchemas);
+
+ return objToBytes(o, type);
+
+ }
+
+ private long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
+ Object o = tuple.get(i);
+ byte type = schemaToType(o, i, fieldSchemas);
+
+ switch (type) {
+ case DataType.LONG:
+ return (Long) o;
+ case DataType.CHARARRAY:
+ String timestampString = (String) o;
+ try {
+ return Long.parseLong(timestampString);
+ } catch (NumberFormatException e) {
+ final String msg = "Could not cast chararray into long: " + timestampString;
+ LOG.error(msg);
+ throw new IOException(msg, e);
+ }
+ case DataType.DOUBLE:
+ Double doubleTimestamp = (Double) o;
+ return doubleTimestamp.longValue();
+ case DataType.FLOAT:
+ Float floatTimestamp = (Float) o;
+ return floatTimestamp.longValue();
+ case DataType.INTEGER:
+ Integer intTimestamp = (Integer) o;
+ return intTimestamp.longValue();
+ case DataType.BIGINTEGER:
+ BigInteger bigintTimestamp = (BigInteger) o;
+ long longTimestamp = bigintTimestamp.longValue();
+
+ BigInteger recreatedTimestamp = BigInteger.valueOf(longTimestamp);
+
+ if (!recreatedTimestamp.equals(bigintTimestamp)) {
+ LOG.warn("Downcasting BigInteger into Long results in a change of the original value. Was " + bigintTimestamp + " but is now " + longTimestamp);
+ }
+
+ return longTimestamp;
+ case DataType.BIGDECIMAL:
+ BigDecimal bigdecimalTimestamp = (BigDecimal) o;
+ try {
+ return bigdecimalTimestamp.longValueExact();
+ } catch (ArithmeticException e) {
+ long convertedLong = bigdecimalTimestamp.longValue();
+ LOG.warn("Downcasting BigDecimal into Long results in a loss of information. Was " + bigdecimalTimestamp + " but is now " + convertedLong);
+ return convertedLong;
+ }
+ case DataType.BYTEARRAY:
+ DataByteArray bytes = (DataByteArray) o;
+ try {
+ return Long.parseLong(bytes.toString());
+ } catch (NumberFormatException e) {
+ final String msg = "Could not cast bytes into long: " + bytes.toString();
+ LOG.error(msg);
+ throw new IOException(msg, e);
+ }
+ default:
+ LOG.error("Could not convert " + o + " of class " + o.getClass() + " into long.");
+ throw new IOException("Could not convert " + o.getClass() + " into long");
+
+ }
+ }
+
+ private Text objToText(Object o, byte type) throws IOException {
+ return new Text(objToBytes(o, type));
+ }
+
+ @SuppressWarnings("unchecked")
+ private byte[] objToBytes(Object o, byte type) throws IOException {
+ if (o == null)
+ return null;
+ switch (type) {
+ case DataType.BYTEARRAY:
+ return ((DataByteArray) o).get();
+ case DataType.BAG:
+ return caster.toBytes((DataBag) o);
+ case DataType.CHARARRAY:
+ return caster.toBytes((String) o);
+ case DataType.DOUBLE:
+ return caster.toBytes((Double) o);
+ case DataType.FLOAT:
+ return caster.toBytes((Float) o);
+ case DataType.INTEGER:
+ return caster.toBytes((Integer) o);
+ case DataType.LONG:
+ return caster.toBytes((Long) o);
+ case DataType.BIGINTEGER:
+ return caster.toBytes((BigInteger) o);
+ case DataType.BIGDECIMAL:
+ return caster.toBytes((BigDecimal) o);
+ case DataType.BOOLEAN:
+ return caster.toBytes((Boolean) o);
+ case DataType.DATETIME:
+ return caster.toBytes((DateTime) o);
+
+ // The type conversion here is unchecked.
+ // Relying on DataType.findType to do the right thing.
+ case DataType.MAP:
+ return caster.toBytes((Map<String,Object>) o);
+
+ case DataType.NULL:
+ return null;
+ case DataType.TUPLE:
+ return caster.toBytes((Tuple) o);
+ case DataType.ERROR:
+ throw new IOException("Unable to determine type of " + o.getClass());
+ default:
+ throw new IOException("Unable to find a converter for tuple field " + o);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
deleted file mode 100644
index 15b1c47..0000000
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.accumulo.pig;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Text;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-
-/**
- * A LoadStoreFunc for retrieving data from and storing data to Accumulo
- *
- * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a
- * long.
- *
- * Tuples can be written in 2 forms: (key, colfam, colqual, colvis, value) OR (key, colfam, colqual, value)
- *
- */
-public class AccumuloStorage extends AbstractAccumuloStorage {
- private static final Log LOG = LogFactory.getLog(AccumuloStorage.class);
-
- public AccumuloStorage() {}
-
- @Override
- protected Tuple getTuple(Key key, Value value) throws IOException {
- // and wrap it in a tuple
- Tuple tuple = TupleFactory.getInstance().newTuple(6);
- tuple.set(0, new DataByteArray(key.getRow().getBytes()));
- tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
- tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
- tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes()));
- tuple.set(4, new Long(key.getTimestamp()));
- tuple.set(5, new DataByteArray(value.get()));
- return tuple;
- }
-
- @Override
- public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
- Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
- Text cf = Utils.objToText(tuple.get(1));
- Text cq = Utils.objToText(tuple.get(2));
-
- if (tuple.size() > 4) {
- Text cv = Utils.objToText(tuple.get(3));
- Value val = new Value(Utils.objToBytes(tuple.get(4)));
- if (cv.getLength() == 0) {
- mut.put(cf, cq, val);
- } else {
- mut.put(cf, cq, new ColumnVisibility(cv), val);
- }
- } else {
- Value val = new Value(Utils.objToBytes(tuple.get(3)));
- mut.put(cf, cq, val);
- }
-
- return Collections.singleton(mut);
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java
deleted file mode 100644
index 30c39c9..0000000
--- a/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.accumulo.pig;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Text;
-import org.apache.pig.LoadStoreCaster;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.LoadPushDown.RequiredFieldList;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.builtin.Utf8StorageConverter;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.impl.util.UDFContext;
-import org.joda.time.DateTime;
-
-/**
- * A LoadStoreFunc for retrieving data from and storing data to Accumulo
- *
- * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a
- * long.
- *
- * Tuples can be written in 2 forms: (key, colfam, colqual, colvis, value) OR (key, colfam, colqual, value)
- *
- */
-public class TypedAccumuloStorage extends AbstractAccumuloStorage {
- private static final Log LOG = LogFactory.getLog(TypedAccumuloStorage.class);
- protected LoadStoreCaster caster;
- protected String contextSignature = null;
-
- private ResourceSchema schema_;
- private RequiredFieldList requiredFieldList;
-
- public TypedAccumuloStorage() {
- this.caster = new Utf8StorageConverter();
- }
-
- @Override
- protected Tuple getTuple(Key key, Value value) throws IOException {
- // and wrap it in a tuple
- Tuple tuple = TupleFactory.getInstance().newTuple(6);
- tuple.set(0, new DataByteArray(key.getRow().getBytes()));
- tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
- tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
- tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes()));
- tuple.set(4, new Long(key.getTimestamp()));
- tuple.set(5, new DataByteArray(value.get()));
- return tuple;
- }
-
- @Override
- public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
- ResourceFieldSchema[] fieldSchemas = (schema_ == null) ? null : schema_.getFields();
-
- Text t = tupleToText(tuple, 0, fieldSchemas);
-
- Mutation mut = new Mutation(t);
- Text cf = tupleToText(tuple, 1, fieldSchemas);
- Text cq = tupleToText(tuple, 2, fieldSchemas);
-
- if (tuple.size() > 4) {
- Text cv = tupleToText(tuple, 3, fieldSchemas);
-
- byte[] valueBytes = tupleToBytes(tuple, 4, fieldSchemas);
-
- Value val = new Value(valueBytes);
- if (cv.getLength() == 0) {
- mut.put(cf, cq, val);
- } else {
- mut.put(cf, cq, new ColumnVisibility(cv), val);
- }
- } else {
- byte[] valueBytes = tupleToBytes(tuple, 3, fieldSchemas);
- Value val = new Value(valueBytes);
- mut.put(cf, cq, val);
- }
-
- return Collections.singleton(mut);
- }
-
- @Override
- public void setUDFContextSignature(String signature) {
- this.contextSignature = signature;
- }
-
- @Override
- public void setStoreFuncUDFContextSignature(String signature) {
- this.contextSignature = signature;
- }
-
- /**
- * Returns UDFProperties based on <code>contextSignature</code>.
- */
- private Properties getUDFProperties() {
- return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] {contextSignature});
- }
-
- @Override
- public void checkSchema(ResourceSchema s) throws IOException {
- if (!(caster instanceof LoadStoreCaster)) {
- LOG.error("Caster must implement LoadStoreCaster for writing to HBase.");
- throw new IOException("Bad Caster " + caster.getClass());
- }
- schema_ = s;
- getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema_));
- }
-
- private Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
- Object o = tuple.get(i);
- byte type = schemaToType(o, i, fieldSchemas);
-
- return objToText(o, type);
- }
-
- private byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) {
- return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
- }
-
- private byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
- Object o = tuple.get(i);
- byte type = schemaToType(o, i, fieldSchemas);
-
- return objToBytes(o, type);
-
- }
-
- private Text objToText(Object o, byte type) throws IOException {
- return new Text(objToBytes(o, type));
- }
-
- @SuppressWarnings("unchecked")
- private byte[] objToBytes(Object o, byte type) throws IOException {
- if (o == null)
- return null;
- switch (type) {
- case DataType.BYTEARRAY:
- return ((DataByteArray) o).get();
- case DataType.BAG:
- return caster.toBytes((DataBag) o);
- case DataType.CHARARRAY:
- return caster.toBytes((String) o);
- case DataType.DOUBLE:
- return caster.toBytes((Double) o);
- case DataType.FLOAT:
- return caster.toBytes((Float) o);
- case DataType.INTEGER:
- return caster.toBytes((Integer) o);
- case DataType.LONG:
- return caster.toBytes((Long) o);
- case DataType.BIGINTEGER:
- return caster.toBytes((BigInteger) o);
- case DataType.BIGDECIMAL:
- return caster.toBytes((BigDecimal) o);
- case DataType.BOOLEAN:
- return caster.toBytes((Boolean) o);
- case DataType.DATETIME:
- return caster.toBytes((DateTime) o);
-
- // The type conversion here is unchecked.
- // Relying on DataType.findType to do the right thing.
- case DataType.MAP:
- return caster.toBytes((Map<String,Object>) o);
-
- case DataType.NULL:
- return null;
- case DataType.TUPLE:
- return caster.toBytes((Tuple) o);
- case DataType.ERROR:
- throw new IOException("Unable to determine type of " + o.getClass());
- default:
- throw new IOException("Unable to find a converter for tuple field " + o);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/test/java/org/apache/accumulo/pig/AccumuloKVStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloKVStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloKVStorageTest.java
new file mode 100644
index 0000000..8adbb52
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloKVStorageTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.pig;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AccumuloKVStorageTest {
+
+ @Test
+ public void testGetMutations4() throws Exception {
+ AccumuloKVStorage s = new AccumuloKVStorage();
+
+ Tuple tuple = TupleFactory.getInstance().newTuple(4);
+ tuple.set(0, "row1");
+ tuple.set(1, "cf1");
+ tuple.set(2, "cq1");
+ tuple.set(3, "val1");
+
+ Collection<Mutation> muts = s.getMutations(tuple);
+
+ assertNotNull(muts);
+ assertEquals(1, muts.size());
+ Mutation mut = muts.iterator().next();
+ List<ColumnUpdate> updates = mut.getUpdates();
+ assertEquals(1, updates.size());
+ ColumnUpdate update = updates.get(0);
+
+ assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), mut.getRow()));
+ assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), update.getColumnFamily()));
+ assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), update.getColumnQualifier()));
+ assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), update.getValue()));
+ assertTrue(Arrays.equals("".getBytes(), update.getColumnVisibility()));
+ }
+
+ @Test
+ public void testGetMutations5() throws Exception {
+ AccumuloKVStorage s = new AccumuloKVStorage();
+
+ Tuple tuple = TupleFactory.getInstance().newTuple(5);
+ tuple.set(0, "row1");
+ tuple.set(1, "cf1");
+ tuple.set(2, "cq1");
+ tuple.set(3, "cv1");
+ tuple.set(4, "val1");
+
+ Collection<Mutation> muts = s.getMutations(tuple);
+
+ assertNotNull(muts);
+ assertEquals(1, muts.size());
+ Mutation mut = muts.iterator().next();
+ List<ColumnUpdate> updates = mut.getUpdates();
+ assertEquals(1, updates.size());
+ ColumnUpdate update = updates.get(0);
+
+ assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), mut.getRow()));
+ assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), update.getColumnFamily()));
+ assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), update.getColumnQualifier()));
+ assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), update.getColumnVisibility()));
+ assertTrue(Arrays.equals(((String) tuple.get(4)).getBytes(), update.getValue()));
+ }
+
+ @Test
+ public void testGetMutations6() throws Exception {
+ AccumuloKVStorage s = new AccumuloKVStorage();
+
+ Tuple tuple = TupleFactory.getInstance().newTuple(6);
+ tuple.set(0, "row");
+ tuple.set(1, "cf");
+ tuple.set(2, "cq");
+ tuple.set(3, "cv");
+ tuple.set(4, new Long(1));
+ tuple.set(5, "value");
+
+ Collection<Mutation> mutations = s.getMutations(tuple);
+ Assert.assertNotNull(mutations);
+ Assert.assertEquals(1, mutations.size());
+ Mutation m = mutations.iterator().next();
+
+ List<ColumnUpdate> updates = m.getUpdates();
+ Assert.assertEquals(1, updates.size());
+ ColumnUpdate update = updates.get(0);
+
+ assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), m.getRow()));
+ assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), update.getColumnFamily()));
+ assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), update.getColumnQualifier()));
+ assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), update.getColumnVisibility()));
+ assertEquals(((Long) tuple.get(4)).longValue(), update.getTimestamp());
+ assertTrue(Arrays.equals(((String) tuple.get(5)).getBytes(), update.getValue()));
+ }
+
+ @Test
+ public void testGetTuple() throws Exception {
+ AccumuloKVStorage s = new AccumuloKVStorage();
+
+ Key key = new Key("row1", "cf1", "cq1", "cv1", 1024L);
+ Value value = new Value("val1".getBytes());
+ Tuple tuple = s.getTuple(key, value);
+ TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
+
+ key = new Key("row1", "cf1", "cq1");
+ value = new Value("val1".getBytes());
+ tuple = s.getTuple(key, value);
+ TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
deleted file mode 100644
index fbd68c6..0000000
--- a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.accumulo.pig;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.accumulo.core.data.ColumnUpdate;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.junit.Test;
-
-public class AccumuloStorageTest {
-
- @Test
- public void testGetMutations4() throws Exception {
- AccumuloStorage s = new AccumuloStorage();
-
- Tuple tuple = TupleFactory.getInstance().newTuple(4);
- tuple.set(0, "row1");
- tuple.set(1, "cf1");
- tuple.set(2, "cq1");
- tuple.set(3, "val1");
-
- Collection<Mutation> muts = s.getMutations(tuple);
-
- assertNotNull(muts);
- assertEquals(1, muts.size());
- Mutation mut = muts.iterator().next();
- List<ColumnUpdate> updates = mut.getUpdates();
- assertEquals(1, updates.size());
- ColumnUpdate update = updates.get(0);
-
- assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), mut.getRow()));
- assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), update.getColumnFamily()));
- assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), update.getColumnQualifier()));
- assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), update.getValue()));
- assertTrue(Arrays.equals("".getBytes(), update.getColumnVisibility()));
- }
-
- @Test
- public void testGetMutations5() throws Exception {
- AccumuloStorage s = new AccumuloStorage();
-
- Tuple tuple = TupleFactory.getInstance().newTuple(5);
- tuple.set(0, "row1");
- tuple.set(1, "cf1");
- tuple.set(2, "cq1");
- tuple.set(3, "cv1");
- tuple.set(4, "val1");
-
- Collection<Mutation> muts = s.getMutations(tuple);
-
- assertNotNull(muts);
- assertEquals(1, muts.size());
- Mutation mut = muts.iterator().next();
- List<ColumnUpdate> updates = mut.getUpdates();
- assertEquals(1, updates.size());
- ColumnUpdate update = updates.get(0);
-
- assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), mut.getRow()));
- assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), update.getColumnFamily()));
- assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), update.getColumnQualifier()));
- assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), update.getColumnVisibility()));
- assertTrue(Arrays.equals(((String) tuple.get(4)).getBytes(), update.getValue()));
- }
-
- @Test
- public void testGetTuple() throws Exception {
- AccumuloStorage s = new AccumuloStorage();
-
- Key key = new Key("row1", "cf1", "cq1", "cv1", 1024L);
- Value value = new Value("val1".getBytes());
- Tuple tuple = s.getTuple(key, value);
- TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
-
- key = new Key("row1", "cf1", "cq1");
- value = new Value("val1".getBytes());
- tuple = s.getTuple(key, value);
- TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
- }
-}
[08/10] git commit: ACCUMULO-1783 Update the Load impl for the store
changes
Posted by el...@apache.org.
ACCUMULO-1783 Update the Load impl for the store changes
For now, add a bool option on the constructor to say whether or not
the same column family should in a row should be aggregated together
into one Map. Removes implicit rules I had on the cf at storage
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/e0d3ade8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/e0d3ade8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/e0d3ade8
Branch: refs/heads/ACCUMULO-1783
Commit: e0d3ade8ef858f4299bb93f81fb4e52cf18b7de2
Parents: 291b939
Author: Josh Elser <el...@apache.org>
Authored: Wed Oct 30 22:17:39 2013 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Oct 30 22:17:39 2013 -0400
----------------------------------------------------------------------
.../apache/accumulo/pig/AccumuloStorage.java | 15 ++++++-
.../accumulo/pig/AccumuloStorageTest.java | 45 +++++++++++++++++++-
2 files changed, 56 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/e0d3ade8/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 97fb44f..cccba64 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -39,6 +39,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
public static final String METADATA_SUFFIX = "_metadata";
protected final List<String> columnSpecs;
+ protected final boolean aggregateColfams;
// Not sure if AccumuloStorage instances need to be thread-safe or not
final Text _cfHolder = new Text(), _cqHolder = new Text();
@@ -47,8 +48,17 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
this("");
}
+ public AccumuloStorage(boolean aggregateColfams) {
+ this("", aggregateColfams);
+ }
+
public AccumuloStorage(String columns) {
+ this(columns, false);
+ }
+
+ public AccumuloStorage(String columns, boolean aggregateColfams) {
this.caster = new Utf8StorageConverter();
+ this.aggregateColfams = aggregateColfams;
// TODO It would be nice to have some other means than enumerating
// the CF for every column in the Tuples we're going process
@@ -77,8 +87,8 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
} else {
Entry<Key,Value> nextEntry = iter.next();
- // If we have the same colfam
- if (currentEntry.getKey().equals(nextEntry.getKey(), PartialKey.ROW_COLFAM)) {
+ // If we're not aggregating colfams together, or we are and we have the same colfam
+ if (!aggregateColfams || currentEntry.getKey().equals(nextEntry.getKey(), PartialKey.ROW_COLFAM)) {
// Aggregate this entry into the map
aggregate.add(nextEntry);
} else {
@@ -208,6 +218,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
*/
protected void addColumn(Mutation mutation, String columnDef, String columnName, Value columnValue) {
if (null == columnDef && null == columnName) {
+ // TODO Emit a counter here somehow?
log.warn("Was provided no name or definition for column. Ignoring value");
return;
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/e0d3ade8/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
index db80c47..a02ad7c 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
@@ -358,8 +358,8 @@ public class AccumuloStorageTest {
}
@Test
- public void testMultipleColumns() throws IOException {
- AccumuloStorage storage = new AccumuloStorage();
+ public void testMultipleColumnsAggregateColfams() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage(true);
List<Key> keys = Lists.newArrayList();
List<Value> values = Lists.newArrayList();
@@ -406,4 +406,45 @@ public class AccumuloStorageTest {
Assert.assertEquals(map, t.get(3));
}
+ @Test
+ public void testMultipleColumnsNoColfamAggregate() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage(false);
+
+ List<Key> keys = Lists.newArrayList();
+ List<Value> values = Lists.newArrayList();
+
+ keys.add(new Key("1", "col1", "cq1"));
+ keys.add(new Key("1", "col1", "cq2"));
+ keys.add(new Key("1", "col1", "cq3"));
+ keys.add(new Key("1", "col2", "cq1"));
+ keys.add(new Key("1", "col3", "cq1"));
+ keys.add(new Key("1", "col3", "cq2"));
+
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value2".getBytes()));
+ values.add(new Value("value3".getBytes()));
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value2".getBytes()));
+
+ Key k = new Key("1");
+ Value v = WholeRowIterator.encodeRow(keys, values);
+
+ Tuple t = storage.getTuple(k, v);
+
+ Assert.assertEquals(2, t.size());
+
+ Assert.assertEquals("1", t.get(0).toString());
+
+ InternalMap map = new InternalMap();
+ map.put("col1:cq1", new DataByteArray("value1"));
+ map.put("col1:cq2", new DataByteArray("value2"));
+ map.put("col1:cq3", new DataByteArray("value3"));
+ map.put("col2:cq1", new DataByteArray("value1"));
+ map.put("col3:cq1", new DataByteArray("value1"));
+ map.put("col3:cq2", new DataByteArray("value2"));
+
+ Assert.assertEquals(map, t.get(1));
+ }
+
}
[10/10] git commit: ACCUMULO-1783 Ensure that no colqual doesn't add
an extra colon
Posted by el...@apache.org.
ACCUMULO-1783 Ensure that no colqual doesn't add an extra colon
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/30fd9aa6
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/30fd9aa6
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/30fd9aa6
Branch: refs/heads/ACCUMULO-1783
Commit: 30fd9aa6c81f41f6d126b01a7e37c9f98f92e338
Parents: 9279c77
Author: Josh Elser <el...@apache.org>
Authored: Wed Oct 30 23:18:16 2013 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Oct 30 23:24:28 2013 -0400
----------------------------------------------------------------------
.../apache/accumulo/pig/AccumuloStorage.java | 18 +++++++++++++----
.../accumulo/pig/AccumuloStorageTest.java | 21 +++++++++++++++++---
2 files changed, 32 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/30fd9aa6/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 1788997..dcfd888 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -121,11 +121,21 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
return tuple;
}
- private Map<String,Object> aggregate(List<Entry<Key,Value>> columns) {
- Map<String,Object> map = new HashMap<String,Object>();
+ protected Map<String,Object> aggregate(List<Entry<Key,Value>> columns) {
+ final Map<String,Object> map = new HashMap<String,Object>();
+ final StringBuilder sb = new StringBuilder(128);
+
for (Entry<Key,Value> column : columns) {
- map.put(column.getKey().getColumnFamily().toString() + COLON + column.getKey().getColumnQualifier().toString(),
- new DataByteArray(column.getValue().get()));
+ String cf = column.getKey().getColumnFamily().toString(), cq = column.getKey().getColumnQualifier().toString();
+
+ sb.append(cf);
+ if (!cq.isEmpty()) {
+ sb.append(COLON).append(cq);
+ }
+
+ map.put(sb.toString(), new DataByteArray(column.getValue().get()));
+
+ sb.setLength(0);
}
return map;
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/30fd9aa6/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
index a02ad7c..aa88191 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
@@ -204,7 +204,7 @@ public class AccumuloStorageTest {
Entry<String,String> key = Maps.immutableEntry(new String(update.getColumnFamily()), new String(update.getColumnQualifier()));
String value = new String(update.getValue());
Assert.assertTrue(expectations.containsKey(key));
-
+
String actual = expectations.remove(key);
Assert.assertEquals(value, actual);
}
@@ -248,7 +248,7 @@ public class AccumuloStorageTest {
Entry<String,String> key = Maps.immutableEntry(new String(update.getColumnFamily()), new String(update.getColumnQualifier()));
String value = new String(update.getValue());
Assert.assertTrue(expectations.containsKey(key));
-
+
String actual = expectations.remove(key);
Assert.assertEquals(value, actual);
}
@@ -292,7 +292,7 @@ public class AccumuloStorageTest {
Entry<String,String> key = Maps.immutableEntry(new String(update.getColumnFamily()), new String(update.getColumnQualifier()));
String value = new String(update.getValue());
Assert.assertTrue(expectations.containsKey(key));
-
+
String actual = expectations.remove(key);
Assert.assertEquals(value, actual);
}
@@ -447,4 +447,19 @@ public class AccumuloStorageTest {
Assert.assertEquals(map, t.get(1));
}
+ @Test
+ public void testNoExtraCharsOnAggregate() throws Exception {
+ List<Entry<Key,Value>> input = Arrays.asList(Maps.immutableEntry(new Key("1", "cf1"), new Value("foo".getBytes())),
+ Maps.immutableEntry(new Key("1", "cf2"), new Value("bar".getBytes())));
+
+ AccumuloStorage storage = new AccumuloStorage();
+
+ Map<String,Object> aggregate = storage.aggregate(input);
+
+ Assert.assertTrue(aggregate.containsKey("cf1"));
+ Assert.assertTrue(aggregate.containsKey("cf2"));
+ Assert.assertEquals("foo", aggregate.get("cf1").toString());
+ Assert.assertEquals("bar", aggregate.get("cf2").toString());
+ }
+
}
[06/10] git commit: ACCUMULO-1783 Reworking the storage side.
Posted by el...@apache.org.
ACCUMULO-1783 Reworking the storage side.
Took an approach like HBase's for the "regular" AccumuloStorage class.
Normal tuples are treated as a row, with the first entry being the
rowkey and subsequent entries as column values. Maps are expanded as
column:value pairs, any scalars, bags or tuples require a column in the
AccumuloStorage constructor. Lots of nice unit tests for the
functionality
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/ad03c51b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/ad03c51b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/ad03c51b
Branch: refs/heads/ACCUMULO-1783
Commit: ad03c51b3e5ee8baea76c0df6a4ca7c8df2b0606
Parents: 8c46d9b
Author: Josh Elser <el...@apache.org>
Authored: Wed Oct 30 20:24:06 2013 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Oct 30 20:24:06 2013 -0400
----------------------------------------------------------------------
pom.xml | 18 +++
.../accumulo/pig/AbstractAccumuloStorage.java | 9 +-
.../apache/accumulo/pig/AccumuloKVStorage.java | 6 +-
.../apache/accumulo/pig/AccumuloStorage.java | 121 +++++++++++--------
.../java/org/apache/accumulo/pig/FORMAT.java | 25 ++++
.../accumulo/pig/AccumuloStorageTest.java | 18 +--
6 files changed, 134 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 630d5e2..b096123 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,6 +51,24 @@
<artifactId>guava</artifactId>
<version>15.0</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-minicluster</artifactId>
+ <version>1.4.4</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pig</groupId>
+ <artifactId>pigunit</artifactId>
+ <version>0.12.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 2361dcf..c2345cc 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -394,7 +394,14 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
}
protected Text objToText(Object o, byte type) throws IOException {
- return new Text(objToBytes(o, type));
+ byte[] bytes = objToBytes(o, type);
+
+ if (null == bytes) {
+ LOG.warn("Creating empty text from null value");
+ return new Text();
+ }
+
+ return new Text(bytes);
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
index 8462985..13b34ce 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
@@ -44,9 +44,9 @@ import org.apache.pig.data.TupleFactory;
* provided:</p>
*
* <ul>
- * <li>(key, colfam, colqual, value)</li>
- * <li>(key, colfam, colqual, colvis, value)</li>
- * <li>(key, colfam, colqual, colvis, timestamp, value)</li>
+ * <li>(row, colfam, colqual, value)</li>
+ * <li>(row, colfam, colqual, colvis, value)</li>
+ * <li>(row, colfam, colqual, colvis, timestamp, value)</li>
* </ul>
*/
public class AccumuloKVStorage extends AbstractAccumuloStorage {
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 030b0c3..97fb44f 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -1,7 +1,6 @@
package org.apache.accumulo.pig;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -37,8 +36,13 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
private static final String COMMA = ",", COLON = ":";
private static final Text EMPTY_TEXT = new Text(new byte[0]);
+ public static final String METADATA_SUFFIX = "_metadata";
+
protected final List<String> columnSpecs;
+ // Not sure if AccumuloStorage instances need to be thread-safe or not
+ final Text _cfHolder = new Text(), _cqHolder = new Text();
+
public AccumuloStorage() {
this("");
}
@@ -46,6 +50,8 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
public AccumuloStorage(String columns) {
this.caster = new Utf8StorageConverter();
+ // TODO It would be nice to have some other means than enumerating
+ // the CF for every column in the Tuples we're going process
if (!StringUtils.isBlank(columns)) {
String[] columnArray = StringUtils.split(columns, COMMA);
columnSpecs = Lists.newArrayList(columnArray);
@@ -121,7 +127,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
@Override
public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
final ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields();
-
+
Iterator<Object> tupleIter = tuple.iterator();
if (1 >= tuple.size()) {
@@ -129,25 +135,21 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
return Collections.emptyList();
}
- Mutation mutation = new Mutation(objectToText(tupleIter.next(), (null == fieldSchemas) ? null : fieldSchemas[0]));
-
- // TODO Can these be lifted up to members of the class instead of this method?
- // Not sure if AccumuloStorage instances need to be thread-safe or not
- final Text _cfHolder = new Text(), _cqHolder = new Text();
+ Mutation mutation = new Mutation(objectToText(tupleIter.next(), (null == fieldSchemas) ? null : fieldSchemas[0]));
int columnOffset = 0;
int tupleOffset = 1;
while (tupleIter.hasNext()) {
Object o = tupleIter.next();
- String cf = null;
+ String family = null;
// Figure out if the user provided a specific columnfamily to use.
if (columnOffset < columnSpecs.size()) {
- cf = columnSpecs.get(columnOffset);
+ family = columnSpecs.get(columnOffset);
}
// Grab the type for this field
- byte type = schemaToType(o, (null == fieldSchemas) ? null : fieldSchemas[tupleOffset]);
+ final byte type = schemaToType(o, (null == fieldSchemas) ? null : fieldSchemas[tupleOffset]);
// If we have a Map, we want to treat every Entry as a column in this record
// placing said column in the column family unless this instance of AccumuloStorage
@@ -159,53 +161,25 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
for (Entry<String,Object> entry : map.entrySet()) {
Object entryObject = entry.getValue();
- byte entryType = DataType.findType(entryObject);
-
- Value value = new Value(objToBytes(entryObject, entryType));
- // If we have a CF, use it and push the Map's key down to the CQ
- if (null != cf) {
- int index = cf.indexOf(COLON);
+ // Treat a null value in the map as the lack of this column
+ // The input may have come from a structured source where the
+ // column could not have been omitted. We can handle the lack of the column
+ if (null != entryObject) {
+ byte entryType = DataType.findType(entryObject);
+ Value value = new Value(objToBytes(entryObject, entryType));
- // No colon in the provided column
- if (-1 == index) {
- _cfHolder.set(cf);
- _cqHolder.set(entry.getKey());
-
- mutation.put(_cfHolder, _cqHolder, value);
- } else {
- _cfHolder.set(cf.getBytes(), 0, index);
-
- _cqHolder.set(cf.getBytes(), index + 1, cf.length() - (index + 1));
- _cqHolder.append(entry.getKey().getBytes(), 0, entry.getKey().length());
-
- mutation.put(_cfHolder, _cqHolder, value);
- }
- } else {
- // Just put the Map's key into the CQ
- _cqHolder.set(entry.getKey());
- mutation.put(EMPTY_TEXT, _cqHolder, value);
+ addColumn(mutation, family, entry.getKey(), value);
}
}
- } else if (null == cf) {
- // We don't know what column to place the value into
- log.warn("Was provided no column family for non-Map entry in the tuple at offset " + tupleOffset);
} else {
- Value value = new Value(objToBytes(o, type));
+ byte[] bytes = objToBytes(o, type);
- // We have something that isn't a Map, use the provided CF as a column name
- // and then shove the value into the Value
- int index = cf.indexOf(COLON);
- if (-1 == index) {
- _cqHolder.set(cf);
+ if (null != bytes) {
+ Value value = new Value(bytes);
- mutation.put(EMPTY_TEXT, _cqHolder, value);
- } else {
- byte[] cfBytes = cf.getBytes();
- _cfHolder.set(cfBytes, 0, index);
- _cqHolder.set(cfBytes, index + 1, cfBytes.length - (index + 1));
-
- mutation.put(_cfHolder, _cqHolder, value);
+ // We don't have any column name from non-Maps
+ addColumn(mutation, family, null, value);
}
}
@@ -219,4 +193,51 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
return Collections.singletonList(mutation);
}
+
+ /**
+ * Adds column and value to the given mutation. A columnfamily and optional column qualifier
+ * or column qualifier prefix is pulled from {@link columnDef} with the family and qualifier
+ * delimiter being a colon. If {@link columnName} is non-null, it will be appended to the qualifier.
+ *
+ * If both the {@link columnDef} and {@link columnName} are null, nothing is added to the mutation
+ *
+ * @param mutation
+ * @param columnDef
+ * @param columnName
+ * @param columnValue
+ */
+ protected void addColumn(Mutation mutation, String columnDef, String columnName, Value columnValue) {
+ if (null == columnDef && null == columnName) {
+ log.warn("Was provided no name or definition for column. Ignoring value");
+ return;
+ }
+
+ if (null != columnDef) {
+ // use the provided columnDef to make a cf (with optional cq prefix)
+ int index = columnDef.indexOf(COLON);
+ if (-1 == index) {
+ _cfHolder.set(columnDef);
+ _cqHolder.clear();
+
+ } else {
+ byte[] cfBytes = columnDef.getBytes();
+ _cfHolder.set(cfBytes, 0, index);
+ _cqHolder.set(cfBytes, index + 1, cfBytes.length - (index + 1));
+ }
+ } else {
+ _cfHolder.clear();
+ _cqHolder.clear();
+ }
+
+ // If we have a column name (this came from a Map)
+ // append that name on the cq.
+ if (null != columnName) {
+ byte[] cnBytes = columnName.getBytes();
+
+ // CQ is either empty or has a prefix from the columnDef
+ _cqHolder.append(cnBytes, 0, cnBytes.length);
+ }
+
+ mutation.put(_cfHolder, _cqHolder, columnValue);
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/src/main/java/org/apache/accumulo/pig/FORMAT.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/FORMAT.java b/src/main/java/org/apache/accumulo/pig/FORMAT.java
new file mode 100644
index 0000000..a72987a
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/pig/FORMAT.java
@@ -0,0 +1,25 @@
+package org.apache.accumulo.pig;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+
+public class FORMAT extends EvalFunc<String> {
+
+ @Override
+ public String exec(Tuple input) throws IOException {
+ if (0 == input.size()) {
+ return null;
+ }
+
+ final String format = input.get(0).toString();
+ Object[] args = new Object[input.size() - 1];
+ for (int i = 1; i < input.size(); i++) {
+ args[i-1] = input.get(i);
+ }
+
+ return String.format(format, args);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
index 10777a8..db80c47 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
@@ -67,8 +67,8 @@ public class AccumuloStorageTest {
Assert.assertEquals(1, colUpdates.size());
ColumnUpdate colUpdate = colUpdates.get(0);
- Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
- Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col".getBytes()));
+ Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col".getBytes()));
+ Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), new byte[0]));
Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value".getBytes()));
}
@@ -120,8 +120,8 @@ public class AccumuloStorageTest {
Assert.assertEquals(4, colUpdates.size());
ColumnUpdate colUpdate = colUpdates.get(0);
- Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
- Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col1".getBytes()));
+ Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col1".getBytes()));
+ Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), new byte[0]));
Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value1".getBytes()));
colUpdate = colUpdates.get(1);
@@ -135,8 +135,8 @@ public class AccumuloStorageTest {
Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value3".getBytes()));
colUpdate = colUpdates.get(3);
- Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
- Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col2".getBytes()));
+ Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col2".getBytes()));
+ Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), new byte[0]));
Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value4".getBytes()));
}
@@ -161,8 +161,8 @@ public class AccumuloStorageTest {
Assert.assertEquals(1, colUpdates.size());
ColumnUpdate colUpdate = colUpdates.get(0);
- Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
- Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col".getBytes()));
+ Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col".getBytes()));
+ Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), new byte[0]));
Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value1".getBytes()));
}
@@ -194,7 +194,7 @@ public class AccumuloStorageTest {
Assert.assertEquals(5, colUpdates.size());
Map<Entry<String,String>,String> expectations = Maps.newHashMap();
- expectations.put(Maps.immutableEntry("", "col"), "value1");
+ expectations.put(Maps.immutableEntry("col", ""), "value1");
expectations.put(Maps.immutableEntry("", "mapcol1"), "mapval1");
expectations.put(Maps.immutableEntry("", "mapcol2"), "mapval2");
expectations.put(Maps.immutableEntry("", "mapcol3"), "mapval3");
[09/10] git commit: ACCUMULO-1783 Remove the InternalMap usage to
make the schema declaration in the value of the AS command in a LOAD
Posted by el...@apache.org.
ACCUMULO-1783 Remove the InternalMap usage to make the schema
declaration in the value of the AS command in a LOAD
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/9279c77b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/9279c77b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/9279c77b
Branch: refs/heads/ACCUMULO-1783
Commit: 9279c77bba29470894ed6bb08fa626a67be6952c
Parents: e0d3ade
Author: Josh Elser <el...@apache.org>
Authored: Wed Oct 30 22:25:39 2013 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Oct 30 22:25:39 2013 -0400
----------------------------------------------------------------------
src/main/java/org/apache/accumulo/pig/AccumuloStorage.java | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/9279c77b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index cccba64..1788997 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -3,6 +3,7 @@ package org.apache.accumulo.pig;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -95,7 +96,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
currentEntry = nextEntry;
// Flush and start again
- InternalMap map = aggregate(aggregate);
+ Map<String,Object> map = aggregate(aggregate);
tupleEntries.add(map);
aggregate = Lists.newLinkedList();
@@ -110,7 +111,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
// and wrap it in a tuple
Tuple tuple = TupleFactory.getInstance().newTuple(tupleEntries.size() + 1);
- tuple.set(0, new DataByteArray(key.getRow().getBytes()));
+ tuple.set(0, key.getRow().toString());
int i = 1;
for (Object obj : tupleEntries) {
tuple.set(i, obj);
@@ -120,8 +121,8 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
return tuple;
}
- private InternalMap aggregate(List<Entry<Key,Value>> columns) {
- InternalMap map = new InternalMap();
+ private Map<String,Object> aggregate(List<Entry<Key,Value>> columns) {
+ Map<String,Object> map = new HashMap<String,Object>();
for (Entry<Key,Value> column : columns) {
map.put(column.getKey().getColumnFamily().toString() + COLON + column.getKey().getColumnQualifier().toString(),
new DataByteArray(column.getValue().get()));
[03/10] git commit: ACCUMULO-1783 Rework AccumuloStorage,
diverging a little from AccumuloWholeRowStorage. Not sure yet if this
is completely desirable or not.
Posted by el...@apache.org.
ACCUMULO-1783 Rework AccumuloStorage, diverging a little from
AccumuloWholeRowStorage. Not sure yet if this is completely desirable or
not.
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/58e5a7ec
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/58e5a7ec
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/58e5a7ec
Branch: refs/heads/ACCUMULO-1783
Commit: 58e5a7ec00a9198b7f31ac910ee61f1d704aaec7
Parents: 74c01ec
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 24 13:33:32 2013 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 24 13:33:32 2013 -0700
----------------------------------------------------------------------
.../apache/accumulo/pig/AccumuloStorage.java | 93 ++++++++++++++++++--
.../accumulo/pig/AccumuloWholeRowStorage.java | 2 +-
.../pig/AbstractAccumuloStorageTest.java | 2 +-
.../pig/AccumuloWholeRowStorageTest.java | 2 +-
.../java/org/apache/accumulo/pig/TestUtils.java | 4 +-
5 files changed, 89 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/58e5a7ec/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index c72f07f..9c8f002 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -1,23 +1,33 @@
package org.apache.accumulo.pig;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.SortedMap;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalMap;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import com.google.common.collect.Lists;
@@ -39,8 +49,72 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
@Override
protected Tuple getTuple(Key key, Value value) throws IOException {
- // TODO Auto-generated method stub
- return null;
+
+ SortedMap<Key,Value> rowKVs = WholeRowIterator.decodeRow(key, value);
+ List<Tuple> columns = new ArrayList<Tuple>(rowKVs.size());
+
+ List<Object> tupleEntries = Lists.newLinkedList();
+ Iterator<Entry<Key,Value>> iter = rowKVs.entrySet().iterator();
+ List<Entry<Key,Value>> aggregate = Lists.newLinkedList();
+ Entry<Key,Value> currentEntry = null;
+
+ while (iter.hasNext()) {
+ if (null == currentEntry) {
+ currentEntry = iter.next();
+ } else {
+ Entry<Key,Value> nextEntry = iter.next();
+
+ // If we have the same colfam
+ if (currentEntry.getKey().equals(nextEntry.getKey(), PartialKey.ROW_COLFAM)) {
+ // Aggregate this entry into the map
+ aggregate.add(nextEntry);
+ } else {
+ // Flush and start again
+ InternalMap map = aggregate(aggregate);
+ tupleEntries.add(map);
+
+ aggregate = Lists.newLinkedList();
+ }
+ }
+ }
+
+ if (!aggregate.isEmpty()) {
+ tupleEntries.add(aggregate(aggregate));
+ }
+
+ // and wrap it in a tuple
+ Tuple tuple = TupleFactory.getInstance().newTuple(tupleEntries.size() + 1);
+ tuple.set(0, new DataByteArray(key.getRow().getBytes()));
+ int i = 1;
+ for (Object obj : tupleEntries) {
+ tuple.set(i, obj);
+ i++;
+ }
+
+ return tuple;
+ }
+
+ private InternalMap aggregate(List<Entry<Key,Value>> columns) {
+ InternalMap map = new InternalMap();
+ for (Entry<Key,Value> column : columns) {
+ map.put(column.getKey().getColumnQualifier().toString(), new DataByteArray(column.getValue().get()));
+ }
+
+ return map;
+ }
+
+ private Tuple columnToTuple(Text colfam, Text colqual, Text colvis, long ts, Value val) throws IOException {
+ Tuple tuple = TupleFactory.getInstance().newTuple(5);
+ tuple.set(0, new DataByteArray(colfam.getBytes()));
+ tuple.set(1, new DataByteArray(colqual.getBytes()));
+ tuple.set(2, new DataByteArray(colvis.getBytes()));
+ tuple.set(3, new Long(ts));
+ tuple.set(4, new DataByteArray(val.get()));
+ return tuple;
+ }
+
+ protected void configureInputFormat(Configuration conf) {
+ AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
}
@Override
@@ -57,6 +131,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
Mutation mutation = new Mutation(objectToText(tupleIter.next(), (null == fieldSchemas) ? null : fieldSchemas[0]));
// TODO Can these be lifted up to members of the class instead of this method?
+ // Not sure if AccumuloStorage instances need to be thread-safe or not
final Text _cfHolder = new Text(), _cqHolder = new Text();
int columnOffset = 0;
@@ -94,9 +169,9 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
mutation.put(_cfHolder, _cqHolder, value);
} else {
- // Just put the Map's key into the CF
- _cfHolder.set(entry.getKey());
- mutation.put(_cfHolder, EMPTY_TEXT, value);
+ // Just put the Map's key into the CQ
+ _cqHolder.set(entry.getKey());
+ mutation.put(EMPTY_TEXT, _cqHolder, value);
}
}
} else if (null == cf) {
@@ -109,13 +184,13 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
// and then shove the value into the Value
int index = cf.indexOf(COLON);
if (-1 == index) {
- _cfHolder.set(cf);
+ _cqHolder.set(cf);
- mutation.put(_cfHolder, EMPTY_TEXT, value);
+ mutation.put(EMPTY_TEXT, _cqHolder, value);
} else {
- byte[] cfBytes = cf.getBytes();
+ byte[] cfBytes = cf.getBytes();
_cfHolder.set(cfBytes, 0, index);
- _cqHolder.set(cfBytes, index+1, cfBytes.length - (index + 1));
+ _cqHolder.set(cfBytes, index + 1, cfBytes.length - (index + 1));
mutation.put(_cfHolder, _cqHolder, value);
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/58e5a7ec/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
index fcfd55e..af3ee01 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
@@ -84,7 +84,7 @@ public class AccumuloWholeRowStorage extends AbstractAccumuloStorage {
}
protected void configureInputFormat(Configuration conf) {
- AccumuloInputFormat.addIterator(conf, new IteratorSetting(10, WholeRowIterator.class));
+ AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/58e5a7ec/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index 9b9c3c7..1b5b81a 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -98,7 +98,7 @@ public class AbstractAccumuloStorageTest {
}
public String getDefaultLoadLocation() {
- return "accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2,col3&start=abc&end=z";
+ return "accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&fetch_columns=col1:cq1,col2:cq2,col3&start=abc&end=z";
}
public String getDefaultStoreLocation() {
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/58e5a7ec/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
index f8e8fe1..690d86c 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
@@ -58,7 +58,7 @@ public class AccumuloWholeRowStorageTest {
Job expected = test.getDefaultExpectedLoadJob();
Configuration expectedConf = expected.getConfiguration();
- AccumuloInputFormat.addIterator(expectedConf, new IteratorSetting(10, WholeRowIterator.class));
+ AccumuloInputFormat.addIterator(expectedConf, new IteratorSetting(50, WholeRowIterator.class));
TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/58e5a7ec/src/test/java/org/apache/accumulo/pig/TestUtils.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/TestUtils.java b/src/test/java/org/apache/accumulo/pig/TestUtils.java
index 6c8bebf..307a871 100644
--- a/src/test/java/org/apache/accumulo/pig/TestUtils.java
+++ b/src/test/java/org/apache/accumulo/pig/TestUtils.java
@@ -39,14 +39,14 @@ public class TestUtils {
Iterator<Entry<String,String>> expectedIter = expectedConf.iterator();
while (expectedIter.hasNext()) {
Entry<String,String> e = expectedIter.next();
- assertEquals(actualConf.get(e.getKey()), expectedConf.get(e.getKey()));
+ assertEquals("Values differed for " + e.getKey(), expectedConf.get(e.getKey()), actualConf.get(e.getKey()));
}
// Basically, for all the keys in actualConf, make sure the values in both confs are equal
Iterator<Entry<String,String>> actualIter = actualConf.iterator();
while (actualIter.hasNext()) {
Entry<String,String> e = actualIter.next();
- assertEquals(actualConf.get(e.getKey()), expectedConf.get(e.getKey()));
+ assertEquals("Values differed for " + e.getKey(), expectedConf.get(e.getKey()), actualConf.get(e.getKey()));
}
}
[02/10] git commit: ACCUMULO-1783 Add in the implementation for
getMutations for AccumuloStorage.
Posted by el...@apache.org.
ACCUMULO-1783 Add in the implementation for getMutations for
AccumuloStorage.
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/74c01ec2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/74c01ec2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/74c01ec2
Branch: refs/heads/ACCUMULO-1783
Commit: 74c01ec215ee7c50fd75c16d606fc8c073f25c8f
Parents: 294f9ce
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 24 12:19:29 2013 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 24 12:19:29 2013 -0700
----------------------------------------------------------------------
pom.xml | 5 +
.../accumulo/pig/AbstractAccumuloStorage.java | 186 +++++++++++++++++--
.../apache/accumulo/pig/AccumuloKVStorage.java | 150 ---------------
.../apache/accumulo/pig/AccumuloStorage.java | 134 +++++++++++++
4 files changed, 312 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/74c01ec2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 249dcce..630d5e2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,11 @@
<artifactId>joda-time</artifactId>
<version>1.6</version>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>15.0</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/74c01ec2/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 0424b8a..494fd72 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -17,10 +17,13 @@
package org.apache.accumulo.pig;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
@@ -31,6 +34,7 @@ import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.Pair;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -42,12 +46,19 @@ import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadStoreCaster;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
+import org.joda.time.DateTime;
/**
* A LoadStoreFunc for retrieving data from and storing data to Accumulo
@@ -61,6 +72,8 @@ import org.apache.pig.impl.util.UDFContext;
public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreFuncInterface {
private static final Log LOG = LogFactory.getLog(AbstractAccumuloStorage.class);
+ private static final String COLON = ":", COMMA = ",";
+
private Configuration conf;
private RecordReader<Key,Value> reader;
private RecordWriter<Text,Mutation> writer;
@@ -81,7 +94,9 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
int maxWriteThreads = 10;
long maxMutationBufferSize = 10 * 1000 * 1000;
int maxLatency = 10 * 1000;
-
+
+ protected LoadStoreCaster caster;
+ protected ResourceSchema schema;
protected String contextSignature = null;
public AbstractAccumuloStorage() {}
@@ -118,7 +133,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
private void setLocationFromUri(String location) throws IOException {
// ex:
- // accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2&start=abc&end=z
+ // accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&fetch_columns=col1:cq1,col2:cq2&start=abc&end=z
String columns = "";
try {
if (!location.startsWith("accumulo://"))
@@ -137,7 +152,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
zookeepers = pair[1];
else if (pair[0].equals("auths"))
auths = pair[1];
- else if (pair[0].equals("columns"))
+ else if (pair[0].equals("fetch_columns"))
columns = pair[1];
else if (pair[0].equals("start"))
start = pair[1];
@@ -158,13 +173,13 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
if (auths == null || auths.equals("")) {
authorizations = new Authorizations();
} else {
- authorizations = new Authorizations(auths.split(","));
+ authorizations = new Authorizations(auths.split(COMMA));
}
- if (!columns.equals("")) {
- for (String cfCq : columns.split(",")) {
- if (cfCq.contains("|")) {
- String[] c = cfCq.split("\\|");
+ if (!StringUtils.isEmpty(columns)) {
+ for (String cfCq : columns.split(COMMA)) {
+ if (cfCq.contains(COLON)) {
+ String[] c = cfCq.split(COLON);
columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text(c[0]), new Text(c[1])));
} else {
columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text(cfCq), null));
@@ -175,7 +190,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
} catch (Exception e) {
throw new IOException(
"Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&"
- + "[start=startRow,end=endRow,columns=[cf1|cq1,cf2|cq2,...],write_buffer_size_bytes=10000000,write_threads=10,write_latency_ms=30000]]': "
+ + "[start=startRow,end=endRow,fetch_columns=[cf1:cq1,cf2:cq2,...],write_buffer_size_bytes=10000000,write_threads=10,write_latency_ms=30000]]': "
+ e.getMessage());
}
}
@@ -255,10 +270,6 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
return new AccumuloOutputFormat();
}
- public void checkSchema(ResourceSchema schema) throws IOException {
- // we don't care about types, they all get casted to ByteBuffers
- }
-
@SuppressWarnings({"rawtypes", "unchecked"})
public void prepareToWrite(RecordWriter writer) {
this.writer = writer;
@@ -280,4 +291,153 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
public void cleanupOnFailure(String failure, Job job) {}
public void cleanupOnSuccess(String location, Job job) {}
+
+ @Override
+ public void checkSchema(ResourceSchema s) throws IOException {
+ if (!(caster instanceof LoadStoreCaster)) {
+ LOG.error("Caster must implement LoadStoreCaster for writing to Accumulo.");
+ throw new IOException("Bad Caster " + caster.getClass());
+ }
+ schema = s;
+ getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema));
+ }
+
+
+ protected Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
+ Object o = tuple.get(i);
+ byte type = schemaToType(o, i, fieldSchemas);
+
+ return objToText(o, type);
+ }
+
+ protected Text objectToText(Object o, ResourceFieldSchema fieldSchema) throws IOException {
+ byte type = schemaToType(o, fieldSchema);
+
+ return objToText(o, type);
+ }
+
+ protected byte schemaToType(Object o, ResourceFieldSchema fieldSchema) {
+ return (fieldSchema == null) ? DataType.findType(o) : fieldSchema.getType();
+ }
+
+ protected byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) {
+ return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
+ }
+
+ protected byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
+ Object o = tuple.get(i);
+ byte type = schemaToType(o, i, fieldSchemas);
+
+ return objToBytes(o, type);
+
+ }
+
+ protected long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
+ Object o = tuple.get(i);
+ byte type = schemaToType(o, i, fieldSchemas);
+
+ switch (type) {
+ case DataType.LONG:
+ return (Long) o;
+ case DataType.CHARARRAY:
+ String timestampString = (String) o;
+ try {
+ return Long.parseLong(timestampString);
+ } catch (NumberFormatException e) {
+ final String msg = "Could not cast chararray into long: " + timestampString;
+ LOG.error(msg);
+ throw new IOException(msg, e);
+ }
+ case DataType.DOUBLE:
+ Double doubleTimestamp = (Double) o;
+ return doubleTimestamp.longValue();
+ case DataType.FLOAT:
+ Float floatTimestamp = (Float) o;
+ return floatTimestamp.longValue();
+ case DataType.INTEGER:
+ Integer intTimestamp = (Integer) o;
+ return intTimestamp.longValue();
+ case DataType.BIGINTEGER:
+ BigInteger bigintTimestamp = (BigInteger) o;
+ long longTimestamp = bigintTimestamp.longValue();
+
+ BigInteger recreatedTimestamp = BigInteger.valueOf(longTimestamp);
+
+ if (!recreatedTimestamp.equals(bigintTimestamp)) {
+ LOG.warn("Downcasting BigInteger into Long results in a change of the original value. Was " + bigintTimestamp + " but is now " + longTimestamp);
+ }
+
+ return longTimestamp;
+ case DataType.BIGDECIMAL:
+ BigDecimal bigdecimalTimestamp = (BigDecimal) o;
+ try {
+ return bigdecimalTimestamp.longValueExact();
+ } catch (ArithmeticException e) {
+ long convertedLong = bigdecimalTimestamp.longValue();
+ LOG.warn("Downcasting BigDecimal into Long results in a loss of information. Was " + bigdecimalTimestamp + " but is now " + convertedLong);
+ return convertedLong;
+ }
+ case DataType.BYTEARRAY:
+ DataByteArray bytes = (DataByteArray) o;
+ try {
+ return Long.parseLong(bytes.toString());
+ } catch (NumberFormatException e) {
+ final String msg = "Could not cast bytes into long: " + bytes.toString();
+ LOG.error(msg);
+ throw new IOException(msg, e);
+ }
+ default:
+ LOG.error("Could not convert " + o + " of class " + o.getClass() + " into long.");
+ throw new IOException("Could not convert " + o.getClass() + " into long");
+
+ }
+ }
+
+ protected Text objToText(Object o, byte type) throws IOException {
+ return new Text(objToBytes(o, type));
+ }
+
+ @SuppressWarnings("unchecked")
+ protected byte[] objToBytes(Object o, byte type) throws IOException {
+ if (o == null)
+ return null;
+ switch (type) {
+ case DataType.BYTEARRAY:
+ return ((DataByteArray) o).get();
+ case DataType.BAG:
+ return caster.toBytes((DataBag) o);
+ case DataType.CHARARRAY:
+ return caster.toBytes((String) o);
+ case DataType.DOUBLE:
+ return caster.toBytes((Double) o);
+ case DataType.FLOAT:
+ return caster.toBytes((Float) o);
+ case DataType.INTEGER:
+ return caster.toBytes((Integer) o);
+ case DataType.LONG:
+ return caster.toBytes((Long) o);
+ case DataType.BIGINTEGER:
+ return caster.toBytes((BigInteger) o);
+ case DataType.BIGDECIMAL:
+ return caster.toBytes((BigDecimal) o);
+ case DataType.BOOLEAN:
+ return caster.toBytes((Boolean) o);
+ case DataType.DATETIME:
+ return caster.toBytes((DateTime) o);
+
+ // The type conversion here is unchecked.
+ // Relying on DataType.findType to do the right thing.
+ case DataType.MAP:
+ return caster.toBytes((Map<String,Object>) o);
+
+ case DataType.NULL:
+ return null;
+ case DataType.TUPLE:
+ return caster.toBytes((Tuple) o);
+ case DataType.ERROR:
+ throw new IOException("Unable to determine type of " + o.getClass());
+ default:
+ throw new IOException("Unable to find a converter for tuple field " + o);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/74c01ec2/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
index 8a17e8b..8462985 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
@@ -17,11 +17,8 @@
package org.apache.accumulo.pig;
import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
import java.util.Collection;
import java.util.Collections;
-import java.util.Map;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -30,18 +27,12 @@ import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
-import org.apache.pig.LoadStoreCaster;
-import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.Utf8StorageConverter;
-import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.util.ObjectSerializer;
-import org.joda.time.DateTime;
/**
* A LoadStoreFunc for retrieving data from and storing data to Accumulo.
@@ -60,9 +51,6 @@ import org.joda.time.DateTime;
*/
public class AccumuloKVStorage extends AbstractAccumuloStorage {
private static final Log LOG = LogFactory.getLog(AccumuloKVStorage.class);
- protected LoadStoreCaster caster;
-
- private ResourceSchema schema;
public AccumuloKVStorage() {
this.caster = new Utf8StorageConverter();
@@ -129,142 +117,4 @@ public class AccumuloKVStorage extends AbstractAccumuloStorage {
return Collections.singleton(mut);
}
- @Override
- public void checkSchema(ResourceSchema s) throws IOException {
- if (!(caster instanceof LoadStoreCaster)) {
- LOG.error("Caster must implement LoadStoreCaster for writing to Accumulo.");
- throw new IOException("Bad Caster " + caster.getClass());
- }
- schema = s;
- getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema));
- }
-
- private Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
- Object o = tuple.get(i);
- byte type = schemaToType(o, i, fieldSchemas);
-
- return objToText(o, type);
- }
-
- private byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) {
- return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
- }
-
- private byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
- Object o = tuple.get(i);
- byte type = schemaToType(o, i, fieldSchemas);
-
- return objToBytes(o, type);
-
- }
-
- private long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
- Object o = tuple.get(i);
- byte type = schemaToType(o, i, fieldSchemas);
-
- switch (type) {
- case DataType.LONG:
- return (Long) o;
- case DataType.CHARARRAY:
- String timestampString = (String) o;
- try {
- return Long.parseLong(timestampString);
- } catch (NumberFormatException e) {
- final String msg = "Could not cast chararray into long: " + timestampString;
- LOG.error(msg);
- throw new IOException(msg, e);
- }
- case DataType.DOUBLE:
- Double doubleTimestamp = (Double) o;
- return doubleTimestamp.longValue();
- case DataType.FLOAT:
- Float floatTimestamp = (Float) o;
- return floatTimestamp.longValue();
- case DataType.INTEGER:
- Integer intTimestamp = (Integer) o;
- return intTimestamp.longValue();
- case DataType.BIGINTEGER:
- BigInteger bigintTimestamp = (BigInteger) o;
- long longTimestamp = bigintTimestamp.longValue();
-
- BigInteger recreatedTimestamp = BigInteger.valueOf(longTimestamp);
-
- if (!recreatedTimestamp.equals(bigintTimestamp)) {
- LOG.warn("Downcasting BigInteger into Long results in a change of the original value. Was " + bigintTimestamp + " but is now " + longTimestamp);
- }
-
- return longTimestamp;
- case DataType.BIGDECIMAL:
- BigDecimal bigdecimalTimestamp = (BigDecimal) o;
- try {
- return bigdecimalTimestamp.longValueExact();
- } catch (ArithmeticException e) {
- long convertedLong = bigdecimalTimestamp.longValue();
- LOG.warn("Downcasting BigDecimal into Long results in a loss of information. Was " + bigdecimalTimestamp + " but is now " + convertedLong);
- return convertedLong;
- }
- case DataType.BYTEARRAY:
- DataByteArray bytes = (DataByteArray) o;
- try {
- return Long.parseLong(bytes.toString());
- } catch (NumberFormatException e) {
- final String msg = "Could not cast bytes into long: " + bytes.toString();
- LOG.error(msg);
- throw new IOException(msg, e);
- }
- default:
- LOG.error("Could not convert " + o + " of class " + o.getClass() + " into long.");
- throw new IOException("Could not convert " + o.getClass() + " into long");
-
- }
- }
-
- private Text objToText(Object o, byte type) throws IOException {
- return new Text(objToBytes(o, type));
- }
-
- @SuppressWarnings("unchecked")
- private byte[] objToBytes(Object o, byte type) throws IOException {
- if (o == null)
- return null;
- switch (type) {
- case DataType.BYTEARRAY:
- return ((DataByteArray) o).get();
- case DataType.BAG:
- return caster.toBytes((DataBag) o);
- case DataType.CHARARRAY:
- return caster.toBytes((String) o);
- case DataType.DOUBLE:
- return caster.toBytes((Double) o);
- case DataType.FLOAT:
- return caster.toBytes((Float) o);
- case DataType.INTEGER:
- return caster.toBytes((Integer) o);
- case DataType.LONG:
- return caster.toBytes((Long) o);
- case DataType.BIGINTEGER:
- return caster.toBytes((BigInteger) o);
- case DataType.BIGDECIMAL:
- return caster.toBytes((BigDecimal) o);
- case DataType.BOOLEAN:
- return caster.toBytes((Boolean) o);
- case DataType.DATETIME:
- return caster.toBytes((DateTime) o);
-
- // The type conversion here is unchecked.
- // Relying on DataType.findType to do the right thing.
- case DataType.MAP:
- return caster.toBytes((Map<String,Object>) o);
-
- case DataType.NULL:
- return null;
- case DataType.TUPLE:
- return caster.toBytes((Tuple) o);
- case DataType.ERROR:
- throw new IOException("Unable to determine type of " + o.getClass());
- default:
- throw new IOException("Unable to find a converter for tuple field " + o);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/74c01ec2/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
new file mode 100644
index 0000000..c72f07f
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -0,0 +1,134 @@
+package org.apache.accumulo.pig;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+import com.google.common.collect.Lists;
+
+public class AccumuloStorage extends AbstractAccumuloStorage {
+ private static final Logger log = Logger.getLogger(AccumuloStorage.class);
+ private static final String COMMA = ",", COLON = ":";
+ private static final Text EMPTY_TEXT = new Text(new byte[0]);
+
+ protected final List<String> columnSpecs;
+
+ public AccumuloStorage(String columns) {
+ if (!StringUtils.isBlank(columns)) {
+ String[] columnArray = StringUtils.split(columns, COMMA);
+ columnSpecs = Lists.newArrayList(columnArray);
+ } else {
+ columnSpecs = Collections.emptyList();
+ }
+ }
+
+ @Override
+ protected Tuple getTuple(Key key, Value value) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
+ final ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields();
+
+ Iterator<Object> tupleIter = tuple.iterator();
+
+ if (1 <= tuple.size()) {
+ log.debug("Ignoring tuple of size " + tuple.size());
+ return Collections.emptyList();
+ }
+
+ Mutation mutation = new Mutation(objectToText(tupleIter.next(), (null == fieldSchemas) ? null : fieldSchemas[0]));
+
+ // TODO Can these be lifted up to members of the class instead of this method?
+ final Text _cfHolder = new Text(), _cqHolder = new Text();
+
+ int columnOffset = 0;
+ int tupleOffset = 1;
+ while (tupleIter.hasNext()) {
+ Object o = tupleIter.next();
+ String cf = null;
+
+ // Figure out if the user provided a specific columnfamily to use.
+ if (columnOffset < columnSpecs.size()) {
+ cf = columnSpecs.get(columnOffset);
+ }
+
+ // Grab the type for this field
+ byte type = schemaToType(o, fieldSchemas[tupleOffset]);
+
+ // If we have a Map, we want to treat every Entry as a column in this record
+ // placing said column in the column family unless this instance of AccumuloStorage
+ // was provided a specific columnFamily to use, in which case the entry's column is
+ // in the column qualifier.
+ if (DataType.MAP == type) {
+ @SuppressWarnings("unchecked")
+ Map<String,Object> map = (Map<String,Object>) o;
+
+ for (Entry<String,Object> entry : map.entrySet()) {
+ Object entryObject = entry.getValue();
+ byte entryType = DataType.findType(entryObject);
+
+ Value value = new Value(objToBytes(entryObject, entryType));
+
+ // If we have a CF, use it and push the Map's key down to the CQ
+ if (null != cf) {
+ _cfHolder.set(cf);
+ _cqHolder.set(entry.getKey());
+
+ mutation.put(_cfHolder, _cqHolder, value);
+ } else {
+ // Just put the Map's key into the CF
+ _cfHolder.set(entry.getKey());
+ mutation.put(_cfHolder, EMPTY_TEXT, value);
+ }
+ }
+ } else if (null == cf) {
+ // We don't know what column to place the value into
+ log.warn("Was provided no column family for non-Map entry in the tuple at offset " + tupleOffset);
+ } else {
+ Value value = new Value(objToBytes(o, type));
+
+ // We have something that isn't a Map, use the provided CF as a column name
+ // and then shove the value into the Value
+ int index = cf.indexOf(COLON);
+ if (-1 == index) {
+ _cfHolder.set(cf);
+
+ mutation.put(_cfHolder, EMPTY_TEXT, value);
+ } else {
+ byte[] cfBytes = cf.getBytes();
+ _cfHolder.set(cfBytes, 0, index);
+ _cqHolder.set(cfBytes, index+1, cfBytes.length - (index + 1));
+
+ mutation.put(_cfHolder, _cqHolder, value);
+ }
+ }
+
+ columnOffset++;
+ tupleOffset++;
+ }
+
+ if (0 == mutation.size()) {
+ return Collections.emptyList();
+ }
+
+ return Collections.singletonList(mutation);
+ }
+}
[05/10] git commit: ACCUMULO-1783 Add in some tests for deserializing
Key/Value back into Tuple, fixing some bugs along the way.
Posted by el...@apache.org.
ACCUMULO-1783 Add in some tests for deserializing Key/Value back into
Tuple, fixing some bugs along the way.
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/8c46d9b3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/8c46d9b3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/8c46d9b3
Branch: refs/heads/ACCUMULO-1783
Commit: 8c46d9b3262e3568dd613f80569d1f11944cf262
Parents: e227ad4
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 24 19:55:01 2013 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 24 19:55:01 2013 -0700
----------------------------------------------------------------------
.../apache/accumulo/pig/AccumuloStorage.java | 11 +-
.../accumulo/pig/AccumuloStorageTest.java | 112 +++++++++++++++++++
2 files changed, 120 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/8c46d9b3/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 3759181..030b0c3 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -67,6 +67,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
while (iter.hasNext()) {
if (null == currentEntry) {
currentEntry = iter.next();
+ aggregate.add(currentEntry);
} else {
Entry<Key,Value> nextEntry = iter.next();
@@ -75,11 +76,14 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
// Aggregate this entry into the map
aggregate.add(nextEntry);
} else {
+ currentEntry = nextEntry;
+
// Flush and start again
InternalMap map = aggregate(aggregate);
tupleEntries.add(map);
aggregate = Lists.newLinkedList();
+ aggregate.add(currentEntry);
}
}
}
@@ -92,7 +96,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
Tuple tuple = TupleFactory.getInstance().newTuple(tupleEntries.size() + 1);
tuple.set(0, new DataByteArray(key.getRow().getBytes()));
int i = 1;
- for (Object obj : tupleEntries) {
+ for (Object obj : tupleEntries) {
tuple.set(i, obj);
i++;
}
@@ -100,10 +104,11 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
return tuple;
}
- private InternalMap aggregate(List<Entry<Key,Value>> columns) {
+ private InternalMap aggregate(List<Entry<Key,Value>> columns) {
InternalMap map = new InternalMap();
for (Entry<Key,Value> column : columns) {
- map.put(column.getKey().getColumnQualifier().toString(), new DataByteArray(column.getValue().get()));
+ map.put(column.getKey().getColumnFamily().toString() + COLON + column.getKey().getColumnQualifier().toString(),
+ new DataByteArray(column.getValue().get()));
}
return map;
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/8c46d9b3/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
index 38d0260..10777a8 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
@@ -8,12 +8,18 @@ import java.util.Map;
import java.util.Map.Entry;
import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.InternalMap;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.junit.Assert;
import org.junit.Test;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
public class AccumuloStorageTest {
@@ -294,4 +300,110 @@ public class AccumuloStorageTest {
Assert.assertTrue("Did not find all expectations", expectations.isEmpty());
}
+ @Test
+ public void testSingleKey() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage();
+
+ List<Key> keys = Lists.newArrayList();
+ List<Value> values = Lists.newArrayList();
+
+ keys.add(new Key("1", "", "col1"));
+ values.add(new Value("value1".getBytes()));
+
+ Key k = new Key("1");
+ Value v = WholeRowIterator.encodeRow(keys, values);
+
+ Tuple t = storage.getTuple(k, v);
+
+ Assert.assertEquals(2, t.size());
+
+ Assert.assertEquals("1", t.get(0).toString());
+
+ InternalMap map = new InternalMap();
+ map.put(":col1", new DataByteArray("value1"));
+
+ Assert.assertEquals(map, t.get(1));
+ }
+
+ @Test
+ public void testSingleColumn() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage();
+
+ List<Key> keys = Lists.newArrayList();
+ List<Value> values = Lists.newArrayList();
+
+ keys.add(new Key("1", "col1", "cq1"));
+ keys.add(new Key("1", "col1", "cq2"));
+ keys.add(new Key("1", "col1", "cq3"));
+
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value2".getBytes()));
+ values.add(new Value("value3".getBytes()));
+
+ Key k = new Key("1");
+ Value v = WholeRowIterator.encodeRow(keys, values);
+
+ Tuple t = storage.getTuple(k, v);
+
+ Assert.assertEquals(2, t.size());
+
+ Assert.assertEquals("1", t.get(0).toString());
+
+ InternalMap map = new InternalMap();
+ map.put("col1:cq1", new DataByteArray("value1"));
+ map.put("col1:cq2", new DataByteArray("value2"));
+ map.put("col1:cq3", new DataByteArray("value3"));
+
+ Assert.assertEquals(map, t.get(1));
+ }
+
+ @Test
+ public void testMultipleColumns() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage();
+
+ List<Key> keys = Lists.newArrayList();
+ List<Value> values = Lists.newArrayList();
+
+ keys.add(new Key("1", "col1", "cq1"));
+ keys.add(new Key("1", "col1", "cq2"));
+ keys.add(new Key("1", "col1", "cq3"));
+ keys.add(new Key("1", "col2", "cq1"));
+ keys.add(new Key("1", "col3", "cq1"));
+ keys.add(new Key("1", "col3", "cq2"));
+
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value2".getBytes()));
+ values.add(new Value("value3".getBytes()));
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value2".getBytes()));
+
+ Key k = new Key("1");
+ Value v = WholeRowIterator.encodeRow(keys, values);
+
+ Tuple t = storage.getTuple(k, v);
+
+ Assert.assertEquals(4, t.size());
+
+ Assert.assertEquals("1", t.get(0).toString());
+
+ InternalMap map = new InternalMap();
+ map.put("col1:cq1", new DataByteArray("value1"));
+ map.put("col1:cq2", new DataByteArray("value2"));
+ map.put("col1:cq3", new DataByteArray("value3"));
+
+ Assert.assertEquals(map, t.get(1));
+
+ map = new InternalMap();
+ map.put("col2:cq1", new DataByteArray("value1"));
+
+ Assert.assertEquals(map, t.get(2));
+
+ map = new InternalMap();
+ map.put("col3:cq1", new DataByteArray("value1"));
+ map.put("col3:cq2", new DataByteArray("value2"));
+
+ Assert.assertEquals(map, t.get(3));
+ }
+
}
[04/10] git commit: ACCUMULO-1783 Update tests for new functionality
and API changes
Posted by el...@apache.org.
ACCUMULO-1783 Update tests for new functionality and API changes
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/e227ad4a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/e227ad4a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/e227ad4a
Branch: refs/heads/ACCUMULO-1783
Commit: e227ad4ab6d4277644e2aecefb1e2c8dc3db22cb
Parents: 58e5a7e
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 24 19:31:00 2013 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 24 19:31:00 2013 -0700
----------------------------------------------------------------------
.../accumulo/pig/AbstractAccumuloStorage.java | 2 +-
.../apache/accumulo/pig/AccumuloStorage.java | 40 ++-
.../accumulo/pig/AccumuloStorageTest.java | 297 +++++++++++++++++++
3 files changed, 322 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/e227ad4a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 494fd72..2361dcf 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -208,7 +208,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
if (columnFamilyColumnQualifierPairs.size() > 0) {
- LOG.info("columns: " + columnFamilyColumnQualifierPairs);
+ LOG.debug("columns: " + columnFamilyColumnQualifierPairs);
AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/e227ad4a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 9c8f002..3759181 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.InternalMap;
@@ -38,7 +39,13 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
protected final List<String> columnSpecs;
+ public AccumuloStorage() {
+ this("");
+ }
+
public AccumuloStorage(String columns) {
+ this.caster = new Utf8StorageConverter();
+
if (!StringUtils.isBlank(columns)) {
String[] columnArray = StringUtils.split(columns, COMMA);
columnSpecs = Lists.newArrayList(columnArray);
@@ -51,7 +58,6 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
protected Tuple getTuple(Key key, Value value) throws IOException {
SortedMap<Key,Value> rowKVs = WholeRowIterator.decodeRow(key, value);
- List<Tuple> columns = new ArrayList<Tuple>(rowKVs.size());
List<Object> tupleEntries = Lists.newLinkedList();
Iterator<Entry<Key,Value>> iter = rowKVs.entrySet().iterator();
@@ -103,16 +109,6 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
return map;
}
- private Tuple columnToTuple(Text colfam, Text colqual, Text colvis, long ts, Value val) throws IOException {
- Tuple tuple = TupleFactory.getInstance().newTuple(5);
- tuple.set(0, new DataByteArray(colfam.getBytes()));
- tuple.set(1, new DataByteArray(colqual.getBytes()));
- tuple.set(2, new DataByteArray(colvis.getBytes()));
- tuple.set(3, new Long(ts));
- tuple.set(4, new DataByteArray(val.get()));
- return tuple;
- }
-
protected void configureInputFormat(Configuration conf) {
AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
}
@@ -123,7 +119,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
Iterator<Object> tupleIter = tuple.iterator();
- if (1 <= tuple.size()) {
+ if (1 >= tuple.size()) {
log.debug("Ignoring tuple of size " + tuple.size());
return Collections.emptyList();
}
@@ -146,7 +142,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
}
// Grab the type for this field
- byte type = schemaToType(o, fieldSchemas[tupleOffset]);
+ byte type = schemaToType(o, (null == fieldSchemas) ? null : fieldSchemas[tupleOffset]);
// If we have a Map, we want to treat every Entry as a column in this record
// placing said column in the column family unless this instance of AccumuloStorage
@@ -164,10 +160,22 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
// If we have a CF, use it and push the Map's key down to the CQ
if (null != cf) {
- _cfHolder.set(cf);
- _cqHolder.set(entry.getKey());
+ int index = cf.indexOf(COLON);
- mutation.put(_cfHolder, _cqHolder, value);
+ // No colon in the provided column
+ if (-1 == index) {
+ _cfHolder.set(cf);
+ _cqHolder.set(entry.getKey());
+
+ mutation.put(_cfHolder, _cqHolder, value);
+ } else {
+ _cfHolder.set(cf.getBytes(), 0, index);
+
+ _cqHolder.set(cf.getBytes(), index + 1, cf.length() - (index + 1));
+ _cqHolder.append(entry.getKey().getBytes(), 0, entry.getKey().length());
+
+ mutation.put(_cfHolder, _cqHolder, value);
+ }
} else {
// Just put the Map's key into the CQ
_cqHolder.set(entry.getKey());
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/e227ad4a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
new file mode 100644
index 0000000..38d0260
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
@@ -0,0 +1,297 @@
+package org.apache.accumulo.pig;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class AccumuloStorageTest {
+
+ @Test
+ public void test1Tuple() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage();
+
+ Tuple t = TupleFactory.getInstance().newTuple(1);
+ t.set(0, "row");
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(0, mutations.size());
+ }
+
+ @Test
+ public void test2TupleNoColumn() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage();
+
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, "row");
+ t.set(1, "value");
+
+ Assert.assertEquals(0, storage.getMutations(t).size());
+ }
+
+ @Test
+ public void test2TupleWithColumn() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage("col");
+
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, "row");
+ t.set(1, "value");
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(1, colUpdates.size());
+
+ ColumnUpdate colUpdate = colUpdates.get(0);
+ Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
+ Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col".getBytes()));
+ Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value".getBytes()));
+ }
+
+ @Test
+ public void test2TupleWithColumnQual() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage("col:qual");
+
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, "row");
+ t.set(1, "value");
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(1, colUpdates.size());
+
+ ColumnUpdate colUpdate = colUpdates.get(0);
+ Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col".getBytes()));
+ Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "qual".getBytes()));
+ Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value".getBytes()));
+ }
+
+ @Test
+ public void test2TupleWithMixedColumns() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage("col1,col1:qual,col2:qual,col2");
+
+ Tuple t = TupleFactory.getInstance().newTuple(5);
+ t.set(0, "row");
+ t.set(1, "value1");
+ t.set(2, "value2");
+ t.set(3, "value3");
+ t.set(4, "value4");
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(4, colUpdates.size());
+
+ ColumnUpdate colUpdate = colUpdates.get(0);
+ Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
+ Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col1".getBytes()));
+ Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value1".getBytes()));
+
+ colUpdate = colUpdates.get(1);
+ Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col1".getBytes()));
+ Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "qual".getBytes()));
+ Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value2".getBytes()));
+
+ colUpdate = colUpdates.get(2);
+ Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col2".getBytes()));
+ Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "qual".getBytes()));
+ Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value3".getBytes()));
+
+ colUpdate = colUpdates.get(3);
+ Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
+ Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col2".getBytes()));
+ Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value4".getBytes()));
+ }
+
+ @Test
+ public void testIgnoredExtraColumns() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage("col");
+
+ Tuple t = TupleFactory.getInstance().newTuple(3);
+ t.set(0, "row");
+ t.set(1, "value1");
+ t.set(2, "value2");
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(1, colUpdates.size());
+
+ ColumnUpdate colUpdate = colUpdates.get(0);
+ Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
+ Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col".getBytes()));
+ Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value1".getBytes()));
+ }
+
+ @Test
+ public void testNonIgnoredExtraAsMap() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage("col");
+
+ Map<String,Object> map = Maps.newHashMap();
+
+ map.put("mapcol1", "mapval1");
+ map.put("mapcol2", "mapval2");
+ map.put("mapcol3", "mapval3");
+ map.put("mapcol4", "mapval4");
+
+ Tuple t = TupleFactory.getInstance().newTuple(3);
+ t.set(0, "row");
+ t.set(1, "value1");
+ t.set(2, map);
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(5, colUpdates.size());
+
+ Map<Entry<String,String>,String> expectations = Maps.newHashMap();
+ expectations.put(Maps.immutableEntry("", "col"), "value1");
+ expectations.put(Maps.immutableEntry("", "mapcol1"), "mapval1");
+ expectations.put(Maps.immutableEntry("", "mapcol2"), "mapval2");
+ expectations.put(Maps.immutableEntry("", "mapcol3"), "mapval3");
+ expectations.put(Maps.immutableEntry("", "mapcol4"), "mapval4");
+
+ for (ColumnUpdate update : colUpdates) {
+ Entry<String,String> key = Maps.immutableEntry(new String(update.getColumnFamily()), new String(update.getColumnQualifier()));
+ String value = new String(update.getValue());
+ Assert.assertTrue(expectations.containsKey(key));
+
+ String actual = expectations.remove(key);
+ Assert.assertEquals(value, actual);
+ }
+
+ Assert.assertTrue("Did not find all expectations", expectations.isEmpty());
+ }
+
+ @Test
+ public void testMapWithColFam() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage("col");
+
+ Map<String,Object> map = Maps.newHashMap();
+
+ map.put("mapcol1", "mapval1");
+ map.put("mapcol2", "mapval2");
+ map.put("mapcol3", "mapval3");
+ map.put("mapcol4", "mapval4");
+
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, "row");
+ t.set(1, map);
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(4, colUpdates.size());
+
+ Map<Entry<String,String>,String> expectations = Maps.newHashMap();
+ expectations.put(Maps.immutableEntry("col", "mapcol1"), "mapval1");
+ expectations.put(Maps.immutableEntry("col", "mapcol2"), "mapval2");
+ expectations.put(Maps.immutableEntry("col", "mapcol3"), "mapval3");
+ expectations.put(Maps.immutableEntry("col", "mapcol4"), "mapval4");
+
+ for (ColumnUpdate update : colUpdates) {
+ Entry<String,String> key = Maps.immutableEntry(new String(update.getColumnFamily()), new String(update.getColumnQualifier()));
+ String value = new String(update.getValue());
+ Assert.assertTrue(expectations.containsKey(key));
+
+ String actual = expectations.remove(key);
+ Assert.assertEquals(value, actual);
+ }
+
+ Assert.assertTrue("Did not find all expectations", expectations.isEmpty());
+ }
+
+ @Test
+ public void testMapWithColFamColQualPrefix() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage("col:qual_");
+
+ Map<String,Object> map = Maps.newHashMap();
+
+ map.put("mapcol1", "mapval1");
+ map.put("mapcol2", "mapval2");
+ map.put("mapcol3", "mapval3");
+ map.put("mapcol4", "mapval4");
+
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, "row");
+ t.set(1, map);
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(4, colUpdates.size());
+
+ Map<Entry<String,String>,String> expectations = Maps.newHashMap();
+ expectations.put(Maps.immutableEntry("col", "qual_mapcol1"), "mapval1");
+ expectations.put(Maps.immutableEntry("col", "qual_mapcol2"), "mapval2");
+ expectations.put(Maps.immutableEntry("col", "qual_mapcol3"), "mapval3");
+ expectations.put(Maps.immutableEntry("col", "qual_mapcol4"), "mapval4");
+
+ for (ColumnUpdate update : colUpdates) {
+ Entry<String,String> key = Maps.immutableEntry(new String(update.getColumnFamily()), new String(update.getColumnQualifier()));
+ String value = new String(update.getValue());
+ Assert.assertTrue(expectations.containsKey(key));
+
+ String actual = expectations.remove(key);
+ Assert.assertEquals(value, actual);
+ }
+
+ Assert.assertTrue("Did not find all expectations", expectations.isEmpty());
+ }
+
+}
[07/10] git commit: ACCUMULO-1783 Add some additional logging on AIF
setup
Posted by el...@apache.org.
ACCUMULO-1783 Add some additional logging on AIF setup
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/291b9392
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/291b9392
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/291b9392
Branch: refs/heads/ACCUMULO-1783
Commit: 291b9392ae53b4e3943869e7b8041dc98560353b
Parents: ad03c51
Author: Josh Elser <el...@apache.org>
Authored: Wed Oct 30 22:16:35 2013 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Oct 30 22:16:35 2013 -0400
----------------------------------------------------------------------
.../org/apache/accumulo/pig/AbstractAccumuloStorage.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/291b9392/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index c2345cc..37efe84 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -208,11 +208,16 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
if (columnFamilyColumnQualifierPairs.size() > 0) {
- LOG.debug("columns: " + columnFamilyColumnQualifierPairs);
+ LOG.info("columns: " + columnFamilyColumnQualifierPairs);
AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
}
- AccumuloInputFormat.setRanges(conf, Collections.singleton(new Range(start, end)));
+ Collection<Range> ranges = Collections.singleton(new Range(start, end));
+
+ LOG.info("Scanning Accumulo for " + ranges);
+
+ AccumuloInputFormat.setRanges(conf, ranges);
+
configureInputFormat(conf);
}
}