You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/10/31 04:25:13 UTC

[01/10] git commit: ACCUMULO-1783 Clean up the "typed" variant from before and supersede the original AccumuloStorage.

Updated Branches:
  refs/heads/ACCUMULO-1783 [created] 30fd9aa6c


ACCUMULO-1783 Clean up the "typed" variant from before and supersede the
original AccumuloStorage.

Going off of what HBaseStorage provides, the typical usecase is
treating each tuple as a "row" and inserting the multiple columns from a
bag or likewise as columns in that row. Trying to move towards this, I
renamed the AccumuloStorage into AccumuloKVStorage to make it obvious
that this storage engine is purely providing a Key/Value storage
mechanism and nothing more.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/294f9ce8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/294f9ce8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/294f9ce8

Branch: refs/heads/ACCUMULO-1783
Commit: 294f9ce8498db614dac198aaf62d3023c7d9b02d
Parents: c25f26c
Author: Josh Elser <el...@apache.org>
Authored: Wed Oct 23 16:48:37 2013 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Wed Oct 23 16:48:37 2013 -0700

----------------------------------------------------------------------
 .../accumulo/pig/AbstractAccumuloStorage.java   |  13 +-
 .../apache/accumulo/pig/AccumuloKVStorage.java  | 270 +++++++++++++++++++
 .../apache/accumulo/pig/AccumuloStorage.java    |  83 ------
 .../accumulo/pig/TypedAccumuloStorage.java      | 207 --------------
 .../accumulo/pig/AccumuloKVStorageTest.java     | 134 +++++++++
 .../accumulo/pig/AccumuloStorageTest.java       | 104 -------
 6 files changed, 416 insertions(+), 395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index d26cf40..0424b8a 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
@@ -46,6 +47,7 @@ import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
 
 /**
  * A LoadStoreFunc for retrieving data from and storing data to Accumulo
@@ -80,6 +82,8 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
   long maxMutationBufferSize = 10 * 1000 * 1000;
   int maxLatency = 10 * 1000;
   
+  protected String contextSignature = null;
+  
   public AbstractAccumuloStorage() {}
   
   @Override
@@ -213,12 +217,19 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
   
   @Override
   public void setUDFContextSignature(String signature) {
-    
+    this.contextSignature = signature;
   }
   
   /* StoreFunc methods */
   public void setStoreFuncUDFContextSignature(String signature) {
+    this.contextSignature = signature;
     
+  }  
+  /**
+   * Returns UDFProperties based on <code>contextSignature</code>.
+   */
+  protected Properties getUDFProperties() {
+    return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] {contextSignature});
   }
   
   public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
new file mode 100644
index 0000000..8a17e8b
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.pig;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.pig.LoadStoreCaster;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.joda.time.DateTime;
+
+/**
+ * A LoadStoreFunc for retrieving data from and storing data to Accumulo.
+ * 
+ * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a
+ * long.
+ * 
+ * <p>Tuples require at least key, column family, column qualifier and value; however column visibility or column visibility and timestamp may also be
+ * provided:</p>
+ * 
+ * <ul>
+ * <li>(key, colfam, colqual, value)</li>
+ * <li>(key, colfam, colqual, colvis, value)</li>
+ * <li>(key, colfam, colqual, colvis, timestamp, value)</li> 
+ * </ul>
+ */
+public class AccumuloKVStorage extends AbstractAccumuloStorage {
+  private static final Log LOG = LogFactory.getLog(AccumuloKVStorage.class);
+  protected LoadStoreCaster caster;
+  
+  private ResourceSchema schema;
+  
+  public AccumuloKVStorage() {
+    this.caster = new Utf8StorageConverter();
+  }
+  
+  @Override
+  protected Tuple getTuple(Key key, Value value) throws IOException {
+    // and wrap it in a tuple
+    Tuple tuple = TupleFactory.getInstance().newTuple(6);
+    tuple.set(0, new DataByteArray(key.getRow().getBytes()));
+    tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
+    tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
+    tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes()));
+    tuple.set(4, new Long(key.getTimestamp()));
+    tuple.set(5, new DataByteArray(value.get()));
+    return tuple;
+  }
+  
+  @Override
+  public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
+    ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields();
+    
+    Text t = tupleToText(tuple, 0, fieldSchemas);
+    
+    Mutation mut = new Mutation(t);
+    Text cf = tupleToText(tuple, 1, fieldSchemas);
+    Text cq = tupleToText(tuple, 2, fieldSchemas);
+    
+    if (4 == tuple.size()) {
+      byte[] valueBytes = tupleToBytes(tuple, 3, fieldSchemas);
+      Value val = new Value(valueBytes);
+      
+      mut.put(cf, cq, val);
+    } else if (5 == tuple.size()) {
+      Text cv = tupleToText(tuple, 3, fieldSchemas);
+      
+      byte[] valueBytes = tupleToBytes(tuple, 4, fieldSchemas);
+      
+      Value val = new Value(valueBytes);
+      if (cv.getLength() == 0) {
+        mut.put(cf, cq, val);
+      } else {
+        mut.put(cf, cq, new ColumnVisibility(cv), val);
+      }
+    } else {
+      if (6 < tuple.size()) {
+        LOG.debug("Ignoring additional entries in tuple of length " + tuple.size());
+      }
+      
+      Text cv = tupleToText(tuple, 3, fieldSchemas);
+      
+      long ts = objToLong(tuple, 4, fieldSchemas);
+      
+      byte[] valueBytes = tupleToBytes(tuple, 5, fieldSchemas);
+      
+      Value val = new Value(valueBytes);
+      if (cv.getLength() == 0) {
+        mut.put(cf, cq, val);
+      } else {
+        mut.put(cf, cq, new ColumnVisibility(cv), ts, val);
+      }
+    }
+    
+    return Collections.singleton(mut);
+  }
+  
+  @Override
+  public void checkSchema(ResourceSchema s) throws IOException {
+    if (!(caster instanceof LoadStoreCaster)) {
+      LOG.error("Caster must implement LoadStoreCaster for writing to Accumulo.");
+      throw new IOException("Bad Caster " + caster.getClass());
+    }
+    schema = s;
+    getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema));
+  }
+  
+  private Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
+    Object o = tuple.get(i);
+    byte type = schemaToType(o, i, fieldSchemas);
+    
+    return objToText(o, type);
+  }
+  
+  private byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) {
+    return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
+  }
+  
+  private byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
+    Object o = tuple.get(i);
+    byte type = schemaToType(o, i, fieldSchemas);
+    
+    return objToBytes(o, type);
+    
+  }
+  
+  private long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException { 
+    Object o = tuple.get(i);
+    byte type = schemaToType(o, i, fieldSchemas);
+    
+    switch (type) {
+      case DataType.LONG:
+        return (Long) o;
+      case DataType.CHARARRAY:
+        String timestampString = (String) o;
+        try {
+          return Long.parseLong(timestampString);
+        } catch (NumberFormatException e) {
+          final String msg = "Could not cast chararray into long: " + timestampString;
+          LOG.error(msg);
+          throw new IOException(msg, e);
+        }
+      case DataType.DOUBLE:
+        Double doubleTimestamp = (Double) o;
+        return doubleTimestamp.longValue();
+      case DataType.FLOAT:
+        Float floatTimestamp = (Float) o;
+        return floatTimestamp.longValue();
+      case DataType.INTEGER:
+        Integer intTimestamp = (Integer) o;
+        return intTimestamp.longValue();
+      case DataType.BIGINTEGER:
+        BigInteger bigintTimestamp = (BigInteger) o;
+        long longTimestamp = bigintTimestamp.longValue();
+        
+        BigInteger recreatedTimestamp = BigInteger.valueOf(longTimestamp);
+        
+        if (!recreatedTimestamp.equals(bigintTimestamp)) {
+          LOG.warn("Downcasting BigInteger into Long results in a change of the original value. Was " + bigintTimestamp + " but is now " + longTimestamp);
+        }
+        
+        return longTimestamp;
+      case DataType.BIGDECIMAL:
+        BigDecimal bigdecimalTimestamp = (BigDecimal) o;
+        try {
+          return bigdecimalTimestamp.longValueExact();
+        } catch (ArithmeticException e) {
+          long convertedLong = bigdecimalTimestamp.longValue();
+          LOG.warn("Downcasting BigDecimal into Long results in a loss of information. Was " + bigdecimalTimestamp + " but is now " + convertedLong);
+          return convertedLong;
+        }
+      case DataType.BYTEARRAY:
+        DataByteArray bytes = (DataByteArray) o;
+        try {
+          return Long.parseLong(bytes.toString());
+        } catch (NumberFormatException e) {
+          final String msg = "Could not cast bytes into long: " + bytes.toString();
+          LOG.error(msg);
+          throw new IOException(msg, e);
+        }
+      default:
+        LOG.error("Could not convert " + o + " of class " + o.getClass() + " into long.");
+        throw new IOException("Could not convert " + o.getClass() + " into long");
+        
+    }
+  }
+  
+  private Text objToText(Object o, byte type) throws IOException {
+    return new Text(objToBytes(o, type));
+  }
+  
+  @SuppressWarnings("unchecked")
+  private byte[] objToBytes(Object o, byte type) throws IOException {
+    if (o == null)
+      return null;
+    switch (type) {
+      case DataType.BYTEARRAY:
+        return ((DataByteArray) o).get();
+      case DataType.BAG:
+        return caster.toBytes((DataBag) o);
+      case DataType.CHARARRAY:
+        return caster.toBytes((String) o);
+      case DataType.DOUBLE:
+        return caster.toBytes((Double) o);
+      case DataType.FLOAT:
+        return caster.toBytes((Float) o);
+      case DataType.INTEGER:
+        return caster.toBytes((Integer) o);
+      case DataType.LONG:
+        return caster.toBytes((Long) o);
+      case DataType.BIGINTEGER:
+        return caster.toBytes((BigInteger) o);
+      case DataType.BIGDECIMAL:
+        return caster.toBytes((BigDecimal) o);
+      case DataType.BOOLEAN:
+        return caster.toBytes((Boolean) o);
+      case DataType.DATETIME:
+        return caster.toBytes((DateTime) o);
+        
+        // The type conversion here is unchecked.
+        // Relying on DataType.findType to do the right thing.
+      case DataType.MAP:
+        return caster.toBytes((Map<String,Object>) o);
+        
+      case DataType.NULL:
+        return null;
+      case DataType.TUPLE:
+        return caster.toBytes((Tuple) o);
+      case DataType.ERROR:
+        throw new IOException("Unable to determine type of " + o.getClass());
+      default:
+        throw new IOException("Unable to find a converter for tuple field " + o);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
deleted file mode 100644
index 15b1c47..0000000
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.accumulo.pig;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Text;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-
-/**
- * A LoadStoreFunc for retrieving data from and storing data to Accumulo
- * 
- * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a
- * long.
- * 
- * Tuples can be written in 2 forms: (key, colfam, colqual, colvis, value) OR (key, colfam, colqual, value)
- * 
- */
-public class AccumuloStorage extends AbstractAccumuloStorage {
-  private static final Log LOG = LogFactory.getLog(AccumuloStorage.class);
-  
-  public AccumuloStorage() {}
-  
-  @Override
-  protected Tuple getTuple(Key key, Value value) throws IOException {
-    // and wrap it in a tuple
-    Tuple tuple = TupleFactory.getInstance().newTuple(6);
-    tuple.set(0, new DataByteArray(key.getRow().getBytes()));
-    tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
-    tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
-    tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes()));
-    tuple.set(4, new Long(key.getTimestamp()));
-    tuple.set(5, new DataByteArray(value.get()));
-    return tuple;
-  }
-  
-  @Override
-  public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
-    Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
-    Text cf = Utils.objToText(tuple.get(1));
-    Text cq = Utils.objToText(tuple.get(2));
-    
-    if (tuple.size() > 4) {
-      Text cv = Utils.objToText(tuple.get(3));
-      Value val = new Value(Utils.objToBytes(tuple.get(4)));
-      if (cv.getLength() == 0) {
-        mut.put(cf, cq, val);
-      } else {
-        mut.put(cf, cq, new ColumnVisibility(cv), val);
-      }
-    } else {
-      Value val = new Value(Utils.objToBytes(tuple.get(3)));
-      mut.put(cf, cq, val);
-    }
-    
-    return Collections.singleton(mut);
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java
deleted file mode 100644
index 30c39c9..0000000
--- a/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.accumulo.pig;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Text;
-import org.apache.pig.LoadStoreCaster;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.LoadPushDown.RequiredFieldList;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.builtin.Utf8StorageConverter;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.impl.util.UDFContext;
-import org.joda.time.DateTime;
-
-/**
- * A LoadStoreFunc for retrieving data from and storing data to Accumulo
- * 
- * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a
- * long.
- * 
- * Tuples can be written in 2 forms: (key, colfam, colqual, colvis, value) OR (key, colfam, colqual, value)
- * 
- */
-public class TypedAccumuloStorage extends AbstractAccumuloStorage {
-  private static final Log LOG = LogFactory.getLog(TypedAccumuloStorage.class);
-  protected LoadStoreCaster caster;
-  protected String contextSignature = null;
-  
-  private ResourceSchema schema_;
-  private RequiredFieldList requiredFieldList;
-  
-  public TypedAccumuloStorage() {
-    this.caster = new Utf8StorageConverter();
-  }
-  
-  @Override
-  protected Tuple getTuple(Key key, Value value) throws IOException {
-    // and wrap it in a tuple
-    Tuple tuple = TupleFactory.getInstance().newTuple(6);
-    tuple.set(0, new DataByteArray(key.getRow().getBytes()));
-    tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
-    tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
-    tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes()));
-    tuple.set(4, new Long(key.getTimestamp()));
-    tuple.set(5, new DataByteArray(value.get()));
-    return tuple;
-  }
-  
-  @Override
-  public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
-    ResourceFieldSchema[] fieldSchemas = (schema_ == null) ? null : schema_.getFields();
-    
-    Text t = tupleToText(tuple, 0, fieldSchemas);
-    
-    Mutation mut = new Mutation(t);
-    Text cf = tupleToText(tuple, 1, fieldSchemas);
-    Text cq = tupleToText(tuple, 2, fieldSchemas);
-    
-    if (tuple.size() > 4) {
-      Text cv = tupleToText(tuple, 3, fieldSchemas);
-      
-      byte[] valueBytes = tupleToBytes(tuple, 4, fieldSchemas);
-      
-      Value val = new Value(valueBytes);
-      if (cv.getLength() == 0) {
-        mut.put(cf, cq, val);
-      } else {
-        mut.put(cf, cq, new ColumnVisibility(cv), val);
-      }
-    } else {
-      byte[] valueBytes = tupleToBytes(tuple, 3, fieldSchemas);
-      Value val = new Value(valueBytes);
-      mut.put(cf, cq, val);
-    }
-    
-    return Collections.singleton(mut);
-  }
-  
-  @Override
-  public void setUDFContextSignature(String signature) {
-    this.contextSignature = signature;
-  }
-  
-  @Override
-  public void setStoreFuncUDFContextSignature(String signature) {
-    this.contextSignature = signature;
-  }
-  
-  /**
-   * Returns UDFProperties based on <code>contextSignature</code>.
-   */
-  private Properties getUDFProperties() {
-    return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] {contextSignature});
-  }
-  
-  @Override
-  public void checkSchema(ResourceSchema s) throws IOException {
-    if (!(caster instanceof LoadStoreCaster)) {
-      LOG.error("Caster must implement LoadStoreCaster for writing to HBase.");
-      throw new IOException("Bad Caster " + caster.getClass());
-    }
-    schema_ = s;
-    getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema_));
-  }
-  
-  private Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
-    Object o = tuple.get(i);
-    byte type = schemaToType(o, i, fieldSchemas);
-    
-    return objToText(o, type);
-  }
-  
-  private byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) {
-    return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
-  }
-  
-  private byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
-    Object o = tuple.get(i);
-    byte type = schemaToType(o, i, fieldSchemas);
-    
-    return objToBytes(o, type);
-    
-  }
-  
-  private Text objToText(Object o, byte type) throws IOException {
-    return new Text(objToBytes(o, type));
-  }
-  
-  @SuppressWarnings("unchecked")
-  private byte[] objToBytes(Object o, byte type) throws IOException {
-    if (o == null)
-      return null;
-    switch (type) {
-      case DataType.BYTEARRAY:
-        return ((DataByteArray) o).get();
-      case DataType.BAG:
-        return caster.toBytes((DataBag) o);
-      case DataType.CHARARRAY:
-        return caster.toBytes((String) o);
-      case DataType.DOUBLE:
-        return caster.toBytes((Double) o);
-      case DataType.FLOAT:
-        return caster.toBytes((Float) o);
-      case DataType.INTEGER:
-        return caster.toBytes((Integer) o);
-      case DataType.LONG:
-        return caster.toBytes((Long) o);
-      case DataType.BIGINTEGER:
-        return caster.toBytes((BigInteger) o);
-      case DataType.BIGDECIMAL:
-        return caster.toBytes((BigDecimal) o);
-      case DataType.BOOLEAN:
-        return caster.toBytes((Boolean) o);
-      case DataType.DATETIME:
-        return caster.toBytes((DateTime) o);
-        
-        // The type conversion here is unchecked.
-        // Relying on DataType.findType to do the right thing.
-      case DataType.MAP:
-        return caster.toBytes((Map<String,Object>) o);
-        
-      case DataType.NULL:
-        return null;
-      case DataType.TUPLE:
-        return caster.toBytes((Tuple) o);
-      case DataType.ERROR:
-        throw new IOException("Unable to determine type of " + o.getClass());
-      default:
-        throw new IOException("Unable to find a converter for tuple field " + o);
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/test/java/org/apache/accumulo/pig/AccumuloKVStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloKVStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloKVStorageTest.java
new file mode 100644
index 0000000..8adbb52
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloKVStorageTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.pig;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AccumuloKVStorageTest {
+  
+  @Test
+  public void testGetMutations4() throws Exception {
+    AccumuloKVStorage s = new AccumuloKVStorage();
+    
+    Tuple tuple = TupleFactory.getInstance().newTuple(4);
+    tuple.set(0, "row1");
+    tuple.set(1, "cf1");
+    tuple.set(2, "cq1");
+    tuple.set(3, "val1");
+    
+    Collection<Mutation> muts = s.getMutations(tuple);
+    
+    assertNotNull(muts);
+    assertEquals(1, muts.size());
+    Mutation mut = muts.iterator().next();
+    List<ColumnUpdate> updates = mut.getUpdates();
+    assertEquals(1, updates.size());
+    ColumnUpdate update = updates.get(0);
+    
+    assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), mut.getRow()));
+    assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), update.getColumnFamily()));
+    assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), update.getColumnQualifier()));
+    assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), update.getValue()));
+    assertTrue(Arrays.equals("".getBytes(), update.getColumnVisibility()));
+  }
+  
+  @Test
+  public void testGetMutations5() throws Exception {
+    AccumuloKVStorage s = new AccumuloKVStorage();
+    
+    Tuple tuple = TupleFactory.getInstance().newTuple(5);
+    tuple.set(0, "row1");
+    tuple.set(1, "cf1");
+    tuple.set(2, "cq1");
+    tuple.set(3, "cv1");
+    tuple.set(4, "val1");
+    
+    Collection<Mutation> muts = s.getMutations(tuple);
+    
+    assertNotNull(muts);
+    assertEquals(1, muts.size());
+    Mutation mut = muts.iterator().next();
+    List<ColumnUpdate> updates = mut.getUpdates();
+    assertEquals(1, updates.size());
+    ColumnUpdate update = updates.get(0);
+    
+    assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), mut.getRow()));
+    assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), update.getColumnFamily()));
+    assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), update.getColumnQualifier()));
+    assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), update.getColumnVisibility()));
+    assertTrue(Arrays.equals(((String) tuple.get(4)).getBytes(), update.getValue()));
+  }
+  
+  @Test
+  public void testGetMutations6() throws Exception {
+    AccumuloKVStorage s = new AccumuloKVStorage();
+    
+    Tuple tuple = TupleFactory.getInstance().newTuple(6);
+    tuple.set(0, "row");
+    tuple.set(1, "cf");
+    tuple.set(2, "cq");
+    tuple.set(3, "cv");
+    tuple.set(4, new Long(1));
+    tuple.set(5, "value");
+    
+    Collection<Mutation> mutations = s.getMutations(tuple);
+    Assert.assertNotNull(mutations);
+    Assert.assertEquals(1, mutations.size());
+    Mutation m = mutations.iterator().next();
+    
+    List<ColumnUpdate> updates = m.getUpdates();
+    Assert.assertEquals(1, updates.size());
+    ColumnUpdate update = updates.get(0);
+    
+    assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), m.getRow()));
+    assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), update.getColumnFamily()));
+    assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), update.getColumnQualifier()));
+    assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), update.getColumnVisibility()));
+    assertEquals(((Long) tuple.get(4)).longValue(), update.getTimestamp());
+    assertTrue(Arrays.equals(((String) tuple.get(5)).getBytes(), update.getValue()));
+  }
+  
+  @Test
+  public void testGetTuple() throws Exception {
+    AccumuloKVStorage s = new AccumuloKVStorage();
+    
+    Key key = new Key("row1", "cf1", "cq1", "cv1", 1024L);
+    Value value = new Value("val1".getBytes());
+    Tuple tuple = s.getTuple(key, value);
+    TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
+    
+    key = new Key("row1", "cf1", "cq1");
+    value = new Value("val1".getBytes());
+    tuple = s.getTuple(key, value);
+    TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
deleted file mode 100644
index fbd68c6..0000000
--- a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.accumulo.pig;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.accumulo.core.data.ColumnUpdate;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.junit.Test;
-
-public class AccumuloStorageTest {
-  
-  @Test
-  public void testGetMutations4() throws Exception {
-    AccumuloStorage s = new AccumuloStorage();
-    
-    Tuple tuple = TupleFactory.getInstance().newTuple(4);
-    tuple.set(0, "row1");
-    tuple.set(1, "cf1");
-    tuple.set(2, "cq1");
-    tuple.set(3, "val1");
-    
-    Collection<Mutation> muts = s.getMutations(tuple);
-    
-    assertNotNull(muts);
-    assertEquals(1, muts.size());
-    Mutation mut = muts.iterator().next();
-    List<ColumnUpdate> updates = mut.getUpdates();
-    assertEquals(1, updates.size());
-    ColumnUpdate update = updates.get(0);
-    
-    assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), mut.getRow()));
-    assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), update.getColumnFamily()));
-    assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), update.getColumnQualifier()));
-    assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), update.getValue()));
-    assertTrue(Arrays.equals("".getBytes(), update.getColumnVisibility()));
-  }
-  
-  @Test
-  public void testGetMutations5() throws Exception {
-    AccumuloStorage s = new AccumuloStorage();
-    
-    Tuple tuple = TupleFactory.getInstance().newTuple(5);
-    tuple.set(0, "row1");
-    tuple.set(1, "cf1");
-    tuple.set(2, "cq1");
-    tuple.set(3, "cv1");
-    tuple.set(4, "val1");
-    
-    Collection<Mutation> muts = s.getMutations(tuple);
-    
-    assertNotNull(muts);
-    assertEquals(1, muts.size());
-    Mutation mut = muts.iterator().next();
-    List<ColumnUpdate> updates = mut.getUpdates();
-    assertEquals(1, updates.size());
-    ColumnUpdate update = updates.get(0);
-    
-    assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), mut.getRow()));
-    assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), update.getColumnFamily()));
-    assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), update.getColumnQualifier()));
-    assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), update.getColumnVisibility()));
-    assertTrue(Arrays.equals(((String) tuple.get(4)).getBytes(), update.getValue()));
-  }
-  
-  @Test
-  public void testGetTuple() throws Exception {
-    AccumuloStorage s = new AccumuloStorage();
-    
-    Key key = new Key("row1", "cf1", "cq1", "cv1", 1024L);
-    Value value = new Value("val1".getBytes());
-    Tuple tuple = s.getTuple(key, value);
-    TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
-    
-    key = new Key("row1", "cf1", "cq1");
-    value = new Value("val1".getBytes());
-    tuple = s.getTuple(key, value);
-    TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
-  }
-}


[08/10] git commit: ACCUMULO-1783 Update the Load impl for the store changes

Posted by el...@apache.org.
ACCUMULO-1783 Update the Load impl for the store changes

For now, add a bool option on the constructor to say whether or not
the same column family should in a row should be aggregated together
into one Map. Removes implicit rules I had on the cf at storage


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/e0d3ade8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/e0d3ade8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/e0d3ade8

Branch: refs/heads/ACCUMULO-1783
Commit: e0d3ade8ef858f4299bb93f81fb4e52cf18b7de2
Parents: 291b939
Author: Josh Elser <el...@apache.org>
Authored: Wed Oct 30 22:17:39 2013 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Oct 30 22:17:39 2013 -0400

----------------------------------------------------------------------
 .../apache/accumulo/pig/AccumuloStorage.java    | 15 ++++++-
 .../accumulo/pig/AccumuloStorageTest.java       | 45 +++++++++++++++++++-
 2 files changed, 56 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/e0d3ade8/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 97fb44f..cccba64 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -39,6 +39,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
   public static final String METADATA_SUFFIX = "_metadata";
   
   protected final List<String> columnSpecs;
+  protected final boolean aggregateColfams;
   
   // Not sure if AccumuloStorage instances need to be thread-safe or not
   final Text _cfHolder = new Text(), _cqHolder = new Text();
@@ -47,8 +48,17 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
     this("");
   }
   
+  public AccumuloStorage(boolean aggregateColfams) {
+    this("", aggregateColfams);
+  }
+  
   public AccumuloStorage(String columns) {
+    this(columns, false);
+  }
+  
+  public AccumuloStorage(String columns, boolean aggregateColfams) {
     this.caster = new Utf8StorageConverter();
+    this.aggregateColfams = aggregateColfams;
     
     // TODO It would be nice to have some other means than enumerating
     // the CF for every column in the Tuples we're going process
@@ -77,8 +87,8 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
       } else {
         Entry<Key,Value> nextEntry = iter.next();
         
-        // If we have the same colfam
-        if (currentEntry.getKey().equals(nextEntry.getKey(), PartialKey.ROW_COLFAM)) {
+        // If we're not aggregating colfams together, or we are and we have the same colfam
+        if (!aggregateColfams || currentEntry.getKey().equals(nextEntry.getKey(), PartialKey.ROW_COLFAM)) {
           // Aggregate this entry into the map
           aggregate.add(nextEntry);
         } else {
@@ -208,6 +218,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
    */
   protected void addColumn(Mutation mutation, String columnDef, String columnName, Value columnValue) {
     if (null == columnDef && null == columnName) {
+      // TODO Emit a counter here somehow?
       log.warn("Was provided no name or definition for column. Ignoring value");
       return;
     }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/e0d3ade8/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
index db80c47..a02ad7c 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
@@ -358,8 +358,8 @@ public class AccumuloStorageTest {
   }
   
   @Test
-  public void testMultipleColumns() throws IOException {
-    AccumuloStorage storage = new AccumuloStorage();
+  public void testMultipleColumnsAggregateColfams() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage(true);
     
     List<Key> keys = Lists.newArrayList();
     List<Value> values = Lists.newArrayList();
@@ -406,4 +406,45 @@ public class AccumuloStorageTest {
     Assert.assertEquals(map, t.get(3));
   }
   
+  @Test
+  public void testMultipleColumnsNoColfamAggregate() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage(false);
+    
+    List<Key> keys = Lists.newArrayList();
+    List<Value> values = Lists.newArrayList();
+    
+    keys.add(new Key("1", "col1", "cq1"));
+    keys.add(new Key("1", "col1", "cq2"));
+    keys.add(new Key("1", "col1", "cq3"));
+    keys.add(new Key("1", "col2", "cq1"));
+    keys.add(new Key("1", "col3", "cq1"));
+    keys.add(new Key("1", "col3", "cq2"));
+    
+    values.add(new Value("value1".getBytes()));
+    values.add(new Value("value2".getBytes()));
+    values.add(new Value("value3".getBytes()));
+    values.add(new Value("value1".getBytes()));
+    values.add(new Value("value1".getBytes()));
+    values.add(new Value("value2".getBytes()));
+    
+    Key k = new Key("1");
+    Value v = WholeRowIterator.encodeRow(keys, values);
+    
+    Tuple t = storage.getTuple(k, v);
+    
+    Assert.assertEquals(2, t.size());
+    
+    Assert.assertEquals("1", t.get(0).toString());
+    
+    InternalMap map = new InternalMap();
+    map.put("col1:cq1", new DataByteArray("value1"));
+    map.put("col1:cq2", new DataByteArray("value2"));
+    map.put("col1:cq3", new DataByteArray("value3"));
+    map.put("col2:cq1", new DataByteArray("value1"));
+    map.put("col3:cq1", new DataByteArray("value1"));
+    map.put("col3:cq2", new DataByteArray("value2"));
+    
+    Assert.assertEquals(map, t.get(1));
+  }
+  
 }


[10/10] git commit: ACCUMULO-1783 Ensure that no colqual doesn't add an extra colon

Posted by el...@apache.org.
ACCUMULO-1783 Ensure that no colqual doesn't add an extra colon


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/30fd9aa6
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/30fd9aa6
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/30fd9aa6

Branch: refs/heads/ACCUMULO-1783
Commit: 30fd9aa6c81f41f6d126b01a7e37c9f98f92e338
Parents: 9279c77
Author: Josh Elser <el...@apache.org>
Authored: Wed Oct 30 23:18:16 2013 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Oct 30 23:24:28 2013 -0400

----------------------------------------------------------------------
 .../apache/accumulo/pig/AccumuloStorage.java    | 18 +++++++++++++----
 .../accumulo/pig/AccumuloStorageTest.java       | 21 +++++++++++++++++---
 2 files changed, 32 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/30fd9aa6/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 1788997..dcfd888 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -121,11 +121,21 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
     return tuple;
   }
   
-  private Map<String,Object> aggregate(List<Entry<Key,Value>> columns) {
-    Map<String,Object> map = new HashMap<String,Object>();
+  protected Map<String,Object> aggregate(List<Entry<Key,Value>> columns) {
+    final Map<String,Object> map = new HashMap<String,Object>();
+    final StringBuilder sb = new StringBuilder(128);
+    
     for (Entry<Key,Value> column : columns) {
-      map.put(column.getKey().getColumnFamily().toString() + COLON + column.getKey().getColumnQualifier().toString(),
-          new DataByteArray(column.getValue().get()));
+      String cf = column.getKey().getColumnFamily().toString(), cq = column.getKey().getColumnQualifier().toString();
+      
+      sb.append(cf);
+      if (!cq.isEmpty()) {
+        sb.append(COLON).append(cq);
+      }
+      
+      map.put(sb.toString(), new DataByteArray(column.getValue().get()));
+      
+      sb.setLength(0);
     }
     
     return map;

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/30fd9aa6/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
index a02ad7c..aa88191 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
@@ -204,7 +204,7 @@ public class AccumuloStorageTest {
       Entry<String,String> key = Maps.immutableEntry(new String(update.getColumnFamily()), new String(update.getColumnQualifier()));
       String value = new String(update.getValue());
       Assert.assertTrue(expectations.containsKey(key));
-     
+      
       String actual = expectations.remove(key);
       Assert.assertEquals(value, actual);
     }
@@ -248,7 +248,7 @@ public class AccumuloStorageTest {
       Entry<String,String> key = Maps.immutableEntry(new String(update.getColumnFamily()), new String(update.getColumnQualifier()));
       String value = new String(update.getValue());
       Assert.assertTrue(expectations.containsKey(key));
-     
+      
       String actual = expectations.remove(key);
       Assert.assertEquals(value, actual);
     }
@@ -292,7 +292,7 @@ public class AccumuloStorageTest {
       Entry<String,String> key = Maps.immutableEntry(new String(update.getColumnFamily()), new String(update.getColumnQualifier()));
       String value = new String(update.getValue());
       Assert.assertTrue(expectations.containsKey(key));
-     
+      
       String actual = expectations.remove(key);
       Assert.assertEquals(value, actual);
     }
@@ -447,4 +447,19 @@ public class AccumuloStorageTest {
     Assert.assertEquals(map, t.get(1));
   }
   
+  @Test
+  public void testNoExtraCharsOnAggregate() throws Exception {
+    List<Entry<Key,Value>> input = Arrays.asList(Maps.immutableEntry(new Key("1", "cf1"), new Value("foo".getBytes())),
+        Maps.immutableEntry(new Key("1", "cf2"), new Value("bar".getBytes())));
+    
+    AccumuloStorage storage = new AccumuloStorage();
+    
+    Map<String,Object> aggregate = storage.aggregate(input);
+    
+    Assert.assertTrue(aggregate.containsKey("cf1"));
+    Assert.assertTrue(aggregate.containsKey("cf2"));
+    Assert.assertEquals("foo", aggregate.get("cf1").toString());
+    Assert.assertEquals("bar", aggregate.get("cf2").toString());
+  }
+  
 }


[06/10] git commit: ACCUMULO-1783 Reworking the storage side.

Posted by el...@apache.org.
ACCUMULO-1783 Reworking the storage side.

Took an approach like HBase's for the "regular" AccumuloStorage class.
Normal tuples are treated as a row, with the first entry being the
rowkey and subsequent entries as column values. Maps are expanded as
column:value pairs, any scalars, bags or tuples require a column in the
AccumuloStorage constructor. Lots of nice unit tests for the
functionality


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/ad03c51b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/ad03c51b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/ad03c51b

Branch: refs/heads/ACCUMULO-1783
Commit: ad03c51b3e5ee8baea76c0df6a4ca7c8df2b0606
Parents: 8c46d9b
Author: Josh Elser <el...@apache.org>
Authored: Wed Oct 30 20:24:06 2013 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Oct 30 20:24:06 2013 -0400

----------------------------------------------------------------------
 pom.xml                                         |  18 +++
 .../accumulo/pig/AbstractAccumuloStorage.java   |   9 +-
 .../apache/accumulo/pig/AccumuloKVStorage.java  |   6 +-
 .../apache/accumulo/pig/AccumuloStorage.java    | 121 +++++++++++--------
 .../java/org/apache/accumulo/pig/FORMAT.java    |  25 ++++
 .../accumulo/pig/AccumuloStorageTest.java       |  18 +--
 6 files changed, 134 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 630d5e2..b096123 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,6 +51,24 @@
       <artifactId>guava</artifactId>
       <version>15.0</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>3.4.5</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-minicluster</artifactId>
+      <version>1.4.4</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pig</groupId>
+      <artifactId>pigunit</artifactId>
+      <version>0.12.0</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   
 </project>

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 2361dcf..c2345cc 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -394,7 +394,14 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
   }
   
   protected Text objToText(Object o, byte type) throws IOException {
-    return new Text(objToBytes(o, type));
+    byte[] bytes = objToBytes(o, type);
+    
+    if (null == bytes) {
+      LOG.warn("Creating empty text from null value");
+      return new Text();
+    }
+    
+    return new Text(bytes);
   }
   
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
index 8462985..13b34ce 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
@@ -44,9 +44,9 @@ import org.apache.pig.data.TupleFactory;
  * provided:</p>
  * 
  * <ul>
- * <li>(key, colfam, colqual, value)</li>
- * <li>(key, colfam, colqual, colvis, value)</li>
- * <li>(key, colfam, colqual, colvis, timestamp, value)</li> 
+ * <li>(row, colfam, colqual, value)</li>
+ * <li>(row, colfam, colqual, colvis, value)</li>
+ * <li>(row, colfam, colqual, colvis, timestamp, value)</li> 
  * </ul>
  */
 public class AccumuloKVStorage extends AbstractAccumuloStorage {

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 030b0c3..97fb44f 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -1,7 +1,6 @@
 package org.apache.accumulo.pig;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -37,8 +36,13 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
   private static final String COMMA = ",", COLON = ":";
   private static final Text EMPTY_TEXT = new Text(new byte[0]);
   
+  public static final String METADATA_SUFFIX = "_metadata";
+  
   protected final List<String> columnSpecs;
   
+  // Not sure if AccumuloStorage instances need to be thread-safe or not
+  final Text _cfHolder = new Text(), _cqHolder = new Text();
+  
   public AccumuloStorage() {
     this("");
   }
@@ -46,6 +50,8 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
   public AccumuloStorage(String columns) {
     this.caster = new Utf8StorageConverter();
     
+    // TODO It would be nice to have some other means than enumerating
+    // the CF for every column in the Tuples we're going process
     if (!StringUtils.isBlank(columns)) {
       String[] columnArray = StringUtils.split(columns, COMMA);
       columnSpecs = Lists.newArrayList(columnArray);
@@ -121,7 +127,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
   @Override
   public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
     final ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields();
-    
+      
     Iterator<Object> tupleIter = tuple.iterator();
     
     if (1 >= tuple.size()) {
@@ -129,25 +135,21 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
       return Collections.emptyList();
     }
     
-    Mutation mutation = new Mutation(objectToText(tupleIter.next(), (null == fieldSchemas) ? null : fieldSchemas[0]));
-    
-    // TODO Can these be lifted up to members of the class instead of this method?
-    // Not sure if AccumuloStorage instances need to be thread-safe or not
-    final Text _cfHolder = new Text(), _cqHolder = new Text();
+    Mutation mutation = new Mutation(objectToText(tupleIter.next(), (null == fieldSchemas) ? null : fieldSchemas[0]));    
     
     int columnOffset = 0;
     int tupleOffset = 1;
     while (tupleIter.hasNext()) {
       Object o = tupleIter.next();
-      String cf = null;
+      String family = null;
       
       // Figure out if the user provided a specific columnfamily to use.
       if (columnOffset < columnSpecs.size()) {
-        cf = columnSpecs.get(columnOffset);
+        family = columnSpecs.get(columnOffset);
       }
       
       // Grab the type for this field
-      byte type = schemaToType(o, (null == fieldSchemas) ? null : fieldSchemas[tupleOffset]);
+      final byte type = schemaToType(o, (null == fieldSchemas) ? null : fieldSchemas[tupleOffset]);
       
       // If we have a Map, we want to treat every Entry as a column in this record
       // placing said column in the column family unless this instance of AccumuloStorage
@@ -159,53 +161,25 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
         
         for (Entry<String,Object> entry : map.entrySet()) {
           Object entryObject = entry.getValue();
-          byte entryType = DataType.findType(entryObject);
-          
-          Value value = new Value(objToBytes(entryObject, entryType));
           
-          // If we have a CF, use it and push the Map's key down to the CQ
-          if (null != cf) {
-            int index = cf.indexOf(COLON);
+          // Treat a null value in the map as the lack of this column
+          // The input may have come from a structured source where the
+          // column could not have been omitted. We can handle the lack of the column
+          if (null != entryObject) {
+            byte entryType = DataType.findType(entryObject);
+            Value value = new Value(objToBytes(entryObject, entryType));
             
-            // No colon in the provided column
-            if (-1 == index) {
-              _cfHolder.set(cf);
-              _cqHolder.set(entry.getKey());
-              
-              mutation.put(_cfHolder, _cqHolder, value);
-            } else {
-              _cfHolder.set(cf.getBytes(), 0, index);
-              
-              _cqHolder.set(cf.getBytes(), index + 1, cf.length() - (index + 1));
-              _cqHolder.append(entry.getKey().getBytes(), 0, entry.getKey().length());
-              
-              mutation.put(_cfHolder, _cqHolder, value);
-            }
-          } else {
-            // Just put the Map's key into the CQ
-            _cqHolder.set(entry.getKey());
-            mutation.put(EMPTY_TEXT, _cqHolder, value);
+            addColumn(mutation, family, entry.getKey(), value);
           }
         }
-      } else if (null == cf) {
-        // We don't know what column to place the value into
-        log.warn("Was provided no column family for non-Map entry in the tuple at offset " + tupleOffset);
       } else {
-        Value value = new Value(objToBytes(o, type));
+        byte[] bytes = objToBytes(o, type);
         
-        // We have something that isn't a Map, use the provided CF as a column name
-        // and then shove the value into the Value
-        int index = cf.indexOf(COLON);
-        if (-1 == index) {
-          _cqHolder.set(cf);
+        if (null != bytes) {
+          Value value = new Value(bytes);
           
-          mutation.put(EMPTY_TEXT, _cqHolder, value);
-        } else {
-          byte[] cfBytes = cf.getBytes();
-          _cfHolder.set(cfBytes, 0, index);
-          _cqHolder.set(cfBytes, index + 1, cfBytes.length - (index + 1));
-          
-          mutation.put(_cfHolder, _cqHolder, value);
+          // We don't have any column name from non-Maps
+          addColumn(mutation, family, null, value);
         }
       }
       
@@ -219,4 +193,51 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
     
     return Collections.singletonList(mutation);
   }
+  
+  /**
+   * Adds column and value to the given mutation. A columnfamily and optional column qualifier
+   * or column qualifier prefix is pulled from {@link columnDef} with the family and qualifier 
+   * delimiter being a colon. If {@link columnName} is non-null, it will be appended to the qualifier.
+   * 
+   * If both the {@link columnDef} and {@link columnName} are null, nothing is added to the mutation
+   * 
+   * @param mutation
+   * @param columnDef
+   * @param columnName
+   * @param columnValue
+   */
+  protected void addColumn(Mutation mutation, String columnDef, String columnName, Value columnValue) {
+    if (null == columnDef && null == columnName) {
+      log.warn("Was provided no name or definition for column. Ignoring value");
+      return;
+    }
+    
+    if (null != columnDef) {
+      // use the provided columnDef to make a cf (with optional cq prefix)
+      int index = columnDef.indexOf(COLON);
+      if (-1 == index) {
+        _cfHolder.set(columnDef);
+        _cqHolder.clear();
+        
+      } else {
+        byte[] cfBytes = columnDef.getBytes();
+        _cfHolder.set(cfBytes, 0, index);
+        _cqHolder.set(cfBytes, index + 1, cfBytes.length - (index + 1)); 
+      }
+    } else {
+      _cfHolder.clear();
+      _cqHolder.clear();
+    }
+    
+    // If we have a column name (this came from a Map)
+    // append that name on the cq.
+    if (null != columnName) {
+      byte[] cnBytes = columnName.getBytes();
+      
+      // CQ is either empty or has a prefix from the columnDef
+      _cqHolder.append(cnBytes, 0, cnBytes.length);
+    }
+    
+    mutation.put(_cfHolder, _cqHolder, columnValue);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/src/main/java/org/apache/accumulo/pig/FORMAT.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/FORMAT.java b/src/main/java/org/apache/accumulo/pig/FORMAT.java
new file mode 100644
index 0000000..a72987a
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/pig/FORMAT.java
@@ -0,0 +1,25 @@
+package org.apache.accumulo.pig;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+
+public class FORMAT extends EvalFunc<String> {
+
+  @Override
+  public String exec(Tuple input) throws IOException {
+    if (0 == input.size()) {
+      return null;
+    }
+    
+    final String format = input.get(0).toString();
+    Object[] args = new Object[input.size() - 1];
+    for (int i = 1; i < input.size(); i++) {
+      args[i-1] = input.get(i);
+    }
+    
+    return String.format(format, args);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
index 10777a8..db80c47 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
@@ -67,8 +67,8 @@ public class AccumuloStorageTest {
     Assert.assertEquals(1, colUpdates.size());
     
     ColumnUpdate colUpdate = colUpdates.get(0);
-    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
-    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col".getBytes()));
+    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col".getBytes()));
+    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), new byte[0]));
     Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value".getBytes()));
   }
   
@@ -120,8 +120,8 @@ public class AccumuloStorageTest {
     Assert.assertEquals(4, colUpdates.size());
     
     ColumnUpdate colUpdate = colUpdates.get(0);
-    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
-    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col1".getBytes()));
+    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col1".getBytes()));
+    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), new byte[0]));
     Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value1".getBytes()));
     
     colUpdate = colUpdates.get(1);
@@ -135,8 +135,8 @@ public class AccumuloStorageTest {
     Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value3".getBytes()));
     
     colUpdate = colUpdates.get(3);
-    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
-    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col2".getBytes()));
+    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col2".getBytes()));
+    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), new byte[0]));
     Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value4".getBytes()));
   }
   
@@ -161,8 +161,8 @@ public class AccumuloStorageTest {
     Assert.assertEquals(1, colUpdates.size());
     
     ColumnUpdate colUpdate = colUpdates.get(0);
-    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
-    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col".getBytes()));
+    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col".getBytes()));
+    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), new byte[0]));
     Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value1".getBytes()));
   }
   
@@ -194,7 +194,7 @@ public class AccumuloStorageTest {
     Assert.assertEquals(5, colUpdates.size());
     
     Map<Entry<String,String>,String> expectations = Maps.newHashMap();
-    expectations.put(Maps.immutableEntry("", "col"), "value1");
+    expectations.put(Maps.immutableEntry("col", ""), "value1");
     expectations.put(Maps.immutableEntry("", "mapcol1"), "mapval1");
     expectations.put(Maps.immutableEntry("", "mapcol2"), "mapval2");
     expectations.put(Maps.immutableEntry("", "mapcol3"), "mapval3");


[09/10] git commit: ACCUMULO-1783 Remove the InternalMap usage to make the schema declaration in the value of the AS command in a LOAD

Posted by el...@apache.org.
ACCUMULO-1783 Remove the InternalMap usage to make the schema
declaration in the value of the AS command in a LOAD


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/9279c77b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/9279c77b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/9279c77b

Branch: refs/heads/ACCUMULO-1783
Commit: 9279c77bba29470894ed6bb08fa626a67be6952c
Parents: e0d3ade
Author: Josh Elser <el...@apache.org>
Authored: Wed Oct 30 22:25:39 2013 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Oct 30 22:25:39 2013 -0400

----------------------------------------------------------------------
 src/main/java/org/apache/accumulo/pig/AccumuloStorage.java | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/9279c77b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index cccba64..1788997 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -3,6 +3,7 @@ package org.apache.accumulo.pig;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -95,7 +96,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
           currentEntry = nextEntry;
           
           // Flush and start again
-          InternalMap map = aggregate(aggregate);
+          Map<String,Object> map = aggregate(aggregate);
           tupleEntries.add(map);
           
           aggregate = Lists.newLinkedList();
@@ -110,7 +111,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
     
     // and wrap it in a tuple
     Tuple tuple = TupleFactory.getInstance().newTuple(tupleEntries.size() + 1);
-    tuple.set(0, new DataByteArray(key.getRow().getBytes()));
+    tuple.set(0, key.getRow().toString());
     int i = 1;
     for (Object obj : tupleEntries) {
       tuple.set(i, obj);
@@ -120,8 +121,8 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
     return tuple;
   }
   
-  private InternalMap aggregate(List<Entry<Key,Value>> columns) {
-    InternalMap map = new InternalMap();
+  private Map<String,Object> aggregate(List<Entry<Key,Value>> columns) {
+    Map<String,Object> map = new HashMap<String,Object>();
     for (Entry<Key,Value> column : columns) {
       map.put(column.getKey().getColumnFamily().toString() + COLON + column.getKey().getColumnQualifier().toString(),
           new DataByteArray(column.getValue().get()));


[03/10] git commit: ACCUMULO-1783 Rework AccumuloStorage, diverging a little from AccumuloWholeRowStorage. Not sure yet if this is completely desirable or not.

Posted by el...@apache.org.
ACCUMULO-1783 Rework AccumuloStorage, diverging a little from
AccumuloWholeRowStorage. Not sure yet if this is completely desirable or
not.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/58e5a7ec
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/58e5a7ec
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/58e5a7ec

Branch: refs/heads/ACCUMULO-1783
Commit: 58e5a7ec00a9198b7f31ac910ee61f1d704aaec7
Parents: 74c01ec
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 24 13:33:32 2013 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 24 13:33:32 2013 -0700

----------------------------------------------------------------------
 .../apache/accumulo/pig/AccumuloStorage.java    | 93 ++++++++++++++++++--
 .../accumulo/pig/AccumuloWholeRowStorage.java   |  2 +-
 .../pig/AbstractAccumuloStorageTest.java        |  2 +-
 .../pig/AccumuloWholeRowStorageTest.java        |  2 +-
 .../java/org/apache/accumulo/pig/TestUtils.java |  4 +-
 5 files changed, 89 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/58e5a7ec/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index c72f07f..9c8f002 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -1,23 +1,33 @@
 package org.apache.accumulo.pig;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.SortedMap;
 
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalMap;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 
 import com.google.common.collect.Lists;
 
@@ -39,8 +49,72 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
   
   @Override
   protected Tuple getTuple(Key key, Value value) throws IOException {
-    // TODO Auto-generated method stub
-    return null;
+    
+    SortedMap<Key,Value> rowKVs = WholeRowIterator.decodeRow(key, value);
+    List<Tuple> columns = new ArrayList<Tuple>(rowKVs.size());
+    
+    List<Object> tupleEntries = Lists.newLinkedList();
+    Iterator<Entry<Key,Value>> iter = rowKVs.entrySet().iterator();
+    List<Entry<Key,Value>> aggregate = Lists.newLinkedList();
+    Entry<Key,Value> currentEntry = null;
+    
+    while (iter.hasNext()) {
+      if (null == currentEntry) {
+        currentEntry = iter.next();
+      } else {
+        Entry<Key,Value> nextEntry = iter.next();
+        
+        // If we have the same colfam
+        if (currentEntry.getKey().equals(nextEntry.getKey(), PartialKey.ROW_COLFAM)) {
+          // Aggregate this entry into the map
+          aggregate.add(nextEntry);
+        } else {
+          // Flush and start again
+          InternalMap map = aggregate(aggregate);
+          tupleEntries.add(map);
+          
+          aggregate = Lists.newLinkedList();
+        }
+      }
+    }
+    
+    if (!aggregate.isEmpty()) {
+      tupleEntries.add(aggregate(aggregate));
+    }
+    
+    // and wrap it in a tuple
+    Tuple tuple = TupleFactory.getInstance().newTuple(tupleEntries.size() + 1);
+    tuple.set(0, new DataByteArray(key.getRow().getBytes()));
+    int i = 1;
+    for (Object obj : tupleEntries)  {
+      tuple.set(i, obj);
+      i++;
+    }
+    
+    return tuple;
+  }
+  
+  private InternalMap aggregate(List<Entry<Key,Value>> columns)  {
+    InternalMap map = new InternalMap();
+    for (Entry<Key,Value> column : columns) {
+      map.put(column.getKey().getColumnQualifier().toString(), new DataByteArray(column.getValue().get()));
+    }
+    
+    return map;
+  }
+  
+  private Tuple columnToTuple(Text colfam, Text colqual, Text colvis, long ts, Value val) throws IOException {
+    Tuple tuple = TupleFactory.getInstance().newTuple(5);
+    tuple.set(0, new DataByteArray(colfam.getBytes()));
+    tuple.set(1, new DataByteArray(colqual.getBytes()));
+    tuple.set(2, new DataByteArray(colvis.getBytes()));
+    tuple.set(3, new Long(ts));
+    tuple.set(4, new DataByteArray(val.get()));
+    return tuple;
+  }
+  
+  protected void configureInputFormat(Configuration conf) {
+    AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
   }
   
   @Override
@@ -57,6 +131,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
     Mutation mutation = new Mutation(objectToText(tupleIter.next(), (null == fieldSchemas) ? null : fieldSchemas[0]));
     
     // TODO Can these be lifted up to members of the class instead of this method?
+    // Not sure if AccumuloStorage instances need to be thread-safe or not
     final Text _cfHolder = new Text(), _cqHolder = new Text();
     
     int columnOffset = 0;
@@ -94,9 +169,9 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
             
             mutation.put(_cfHolder, _cqHolder, value);
           } else {
-            // Just put the Map's key into the CF
-            _cfHolder.set(entry.getKey());
-            mutation.put(_cfHolder, EMPTY_TEXT, value);
+            // Just put the Map's key into the CQ
+            _cqHolder.set(entry.getKey());
+            mutation.put(EMPTY_TEXT, _cqHolder, value);
           }
         }
       } else if (null == cf) {
@@ -109,13 +184,13 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
         // and then shove the value into the Value
         int index = cf.indexOf(COLON);
         if (-1 == index) {
-          _cfHolder.set(cf);
+          _cqHolder.set(cf);
           
-          mutation.put(_cfHolder, EMPTY_TEXT, value);
+          mutation.put(EMPTY_TEXT, _cqHolder, value);
         } else {
-          byte[] cfBytes = cf.getBytes(); 
+          byte[] cfBytes = cf.getBytes();
           _cfHolder.set(cfBytes, 0, index);
-          _cqHolder.set(cfBytes, index+1, cfBytes.length - (index + 1));
+          _cqHolder.set(cfBytes, index + 1, cfBytes.length - (index + 1));
           
           mutation.put(_cfHolder, _cqHolder, value);
         }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/58e5a7ec/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
index fcfd55e..af3ee01 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
@@ -84,7 +84,7 @@ public class AccumuloWholeRowStorage extends AbstractAccumuloStorage {
   }
   
   protected void configureInputFormat(Configuration conf) {
-    AccumuloInputFormat.addIterator(conf, new IteratorSetting(10, WholeRowIterator.class));
+    AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/58e5a7ec/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index 9b9c3c7..1b5b81a 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -98,7 +98,7 @@ public class AbstractAccumuloStorageTest {
   }
   
   public String getDefaultLoadLocation() {
-    return "accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2,col3&start=abc&end=z";
+    return "accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&fetch_columns=col1:cq1,col2:cq2,col3&start=abc&end=z";
   }
   
   public String getDefaultStoreLocation() {

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/58e5a7ec/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
index f8e8fe1..690d86c 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
@@ -58,7 +58,7 @@ public class AccumuloWholeRowStorageTest {
     
     Job expected = test.getDefaultExpectedLoadJob();
     Configuration expectedConf = expected.getConfiguration();
-    AccumuloInputFormat.addIterator(expectedConf, new IteratorSetting(10, WholeRowIterator.class));
+    AccumuloInputFormat.addIterator(expectedConf, new IteratorSetting(50, WholeRowIterator.class));
     
     TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
   }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/58e5a7ec/src/test/java/org/apache/accumulo/pig/TestUtils.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/TestUtils.java b/src/test/java/org/apache/accumulo/pig/TestUtils.java
index 6c8bebf..307a871 100644
--- a/src/test/java/org/apache/accumulo/pig/TestUtils.java
+++ b/src/test/java/org/apache/accumulo/pig/TestUtils.java
@@ -39,14 +39,14 @@ public class TestUtils {
     Iterator<Entry<String,String>> expectedIter = expectedConf.iterator();
     while (expectedIter.hasNext()) {
       Entry<String,String> e = expectedIter.next();
-      assertEquals(actualConf.get(e.getKey()), expectedConf.get(e.getKey()));
+      assertEquals("Values differed for " + e.getKey(), expectedConf.get(e.getKey()), actualConf.get(e.getKey()));
     }
     
     // Basically, for all the keys in actualConf, make sure the values in both confs are equal
     Iterator<Entry<String,String>> actualIter = actualConf.iterator();
     while (actualIter.hasNext()) {
       Entry<String,String> e = actualIter.next();
-      assertEquals(actualConf.get(e.getKey()), expectedConf.get(e.getKey()));
+      assertEquals("Values differed for " + e.getKey(), expectedConf.get(e.getKey()), actualConf.get(e.getKey()));
     }
   }
   


[02/10] git commit: ACCUMULO-1783 Add in the implementation for getMutations for AccumuloStorage.

Posted by el...@apache.org.
ACCUMULO-1783 Add in the implementation for getMutations for
AccumuloStorage.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/74c01ec2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/74c01ec2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/74c01ec2

Branch: refs/heads/ACCUMULO-1783
Commit: 74c01ec215ee7c50fd75c16d606fc8c073f25c8f
Parents: 294f9ce
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 24 12:19:29 2013 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 24 12:19:29 2013 -0700

----------------------------------------------------------------------
 pom.xml                                         |   5 +
 .../accumulo/pig/AbstractAccumuloStorage.java   | 186 +++++++++++++++++--
 .../apache/accumulo/pig/AccumuloKVStorage.java  | 150 ---------------
 .../apache/accumulo/pig/AccumuloStorage.java    | 134 +++++++++++++
 4 files changed, 312 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/74c01ec2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 249dcce..630d5e2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,11 @@
       <artifactId>joda-time</artifactId>
       <version>1.6</version>
     </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>15.0</version>
+    </dependency>
   </dependencies>
   
 </project>

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/74c01ec2/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 0424b8a..494fd72 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -17,10 +17,13 @@
 package org.apache.accumulo.pig;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
@@ -31,6 +34,7 @@ import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.Pair;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -42,12 +46,19 @@ import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadStoreCaster;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
+import org.joda.time.DateTime;
 
 /**
  * A LoadStoreFunc for retrieving data from and storing data to Accumulo
@@ -61,6 +72,8 @@ import org.apache.pig.impl.util.UDFContext;
 public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreFuncInterface {
   private static final Log LOG = LogFactory.getLog(AbstractAccumuloStorage.class);
   
+  private static final String COLON = ":", COMMA = ",";
+  
   private Configuration conf;
   private RecordReader<Key,Value> reader;
   private RecordWriter<Text,Mutation> writer;
@@ -81,7 +94,9 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
   int maxWriteThreads = 10;
   long maxMutationBufferSize = 10 * 1000 * 1000;
   int maxLatency = 10 * 1000;
-  
+
+  protected LoadStoreCaster caster;
+  protected ResourceSchema schema;
   protected String contextSignature = null;
   
   public AbstractAccumuloStorage() {}
@@ -118,7 +133,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
   
   private void setLocationFromUri(String location) throws IOException {
     // ex:
-    // accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2&start=abc&end=z
+    // accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&fetch_columns=col1:cq1,col2:cq2&start=abc&end=z
     String columns = "";
     try {
       if (!location.startsWith("accumulo://"))
@@ -137,7 +152,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
             zookeepers = pair[1];
           else if (pair[0].equals("auths"))
             auths = pair[1];
-          else if (pair[0].equals("columns"))
+          else if (pair[0].equals("fetch_columns"))
             columns = pair[1];
           else if (pair[0].equals("start"))
             start = pair[1];
@@ -158,13 +173,13 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
       if (auths == null || auths.equals("")) {
         authorizations = new Authorizations();
       } else {
-        authorizations = new Authorizations(auths.split(","));
+        authorizations = new Authorizations(auths.split(COMMA));
       }
       
-      if (!columns.equals("")) {
-        for (String cfCq : columns.split(",")) {
-          if (cfCq.contains("|")) {
-            String[] c = cfCq.split("\\|");
+      if (!StringUtils.isEmpty(columns)) {
+        for (String cfCq : columns.split(COMMA)) {
+          if (cfCq.contains(COLON)) {
+            String[] c = cfCq.split(COLON);
             columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text(c[0]), new Text(c[1])));
           } else {
             columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text(cfCq), null));
@@ -175,7 +190,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
     } catch (Exception e) {
       throw new IOException(
           "Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&"
-              + "[start=startRow,end=endRow,columns=[cf1|cq1,cf2|cq2,...],write_buffer_size_bytes=10000000,write_threads=10,write_latency_ms=30000]]': "
+              + "[start=startRow,end=endRow,fetch_columns=[cf1:cq1,cf2:cq2,...],write_buffer_size_bytes=10000000,write_threads=10,write_latency_ms=30000]]': "
               + e.getMessage());
     }
   }
@@ -255,10 +270,6 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
     return new AccumuloOutputFormat();
   }
   
-  public void checkSchema(ResourceSchema schema) throws IOException {
-    // we don't care about types, they all get casted to ByteBuffers
-  }
-  
   @SuppressWarnings({"rawtypes", "unchecked"})
   public void prepareToWrite(RecordWriter writer) {
     this.writer = writer;
@@ -280,4 +291,153 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
   public void cleanupOnFailure(String failure, Job job) {}
 
   public void cleanupOnSuccess(String location, Job job) {}
+  
+  @Override
+  public void checkSchema(ResourceSchema s) throws IOException {
+    if (!(caster instanceof LoadStoreCaster)) {
+      LOG.error("Caster must implement LoadStoreCaster for writing to Accumulo.");
+      throw new IOException("Bad Caster " + caster.getClass());
+    }
+    schema = s;
+    getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema));
+  }
+
+  
+  protected Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
+    Object o = tuple.get(i);
+    byte type = schemaToType(o, i, fieldSchemas);
+    
+    return objToText(o, type);
+  }
+  
+  protected Text objectToText(Object o, ResourceFieldSchema fieldSchema) throws IOException {
+    byte type = schemaToType(o, fieldSchema);
+    
+    return objToText(o, type);
+  }
+  
+  protected byte schemaToType(Object o, ResourceFieldSchema fieldSchema) {
+    return (fieldSchema == null) ? DataType.findType(o) : fieldSchema.getType();
+  }
+  
+  protected byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) {
+    return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
+  }
+  
+  protected byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
+    Object o = tuple.get(i);
+    byte type = schemaToType(o, i, fieldSchemas);
+    
+    return objToBytes(o, type);
+    
+  }
+  
+  protected long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException { 
+    Object o = tuple.get(i);
+    byte type = schemaToType(o, i, fieldSchemas);
+    
+    switch (type) {
+      case DataType.LONG:
+        return (Long) o;
+      case DataType.CHARARRAY:
+        String timestampString = (String) o;
+        try {
+          return Long.parseLong(timestampString);
+        } catch (NumberFormatException e) {
+          final String msg = "Could not cast chararray into long: " + timestampString;
+          LOG.error(msg);
+          throw new IOException(msg, e);
+        }
+      case DataType.DOUBLE:
+        Double doubleTimestamp = (Double) o;
+        return doubleTimestamp.longValue();
+      case DataType.FLOAT:
+        Float floatTimestamp = (Float) o;
+        return floatTimestamp.longValue();
+      case DataType.INTEGER:
+        Integer intTimestamp = (Integer) o;
+        return intTimestamp.longValue();
+      case DataType.BIGINTEGER:
+        BigInteger bigintTimestamp = (BigInteger) o;
+        long longTimestamp = bigintTimestamp.longValue();
+        
+        BigInteger recreatedTimestamp = BigInteger.valueOf(longTimestamp);
+        
+        if (!recreatedTimestamp.equals(bigintTimestamp)) {
+          LOG.warn("Downcasting BigInteger into Long results in a change of the original value. Was " + bigintTimestamp + " but is now " + longTimestamp);
+        }
+        
+        return longTimestamp;
+      case DataType.BIGDECIMAL:
+        BigDecimal bigdecimalTimestamp = (BigDecimal) o;
+        try {
+          return bigdecimalTimestamp.longValueExact();
+        } catch (ArithmeticException e) {
+          long convertedLong = bigdecimalTimestamp.longValue();
+          LOG.warn("Downcasting BigDecimal into Long results in a loss of information. Was " + bigdecimalTimestamp + " but is now " + convertedLong);
+          return convertedLong;
+        }
+      case DataType.BYTEARRAY:
+        DataByteArray bytes = (DataByteArray) o;
+        try {
+          return Long.parseLong(bytes.toString());
+        } catch (NumberFormatException e) {
+          final String msg = "Could not cast bytes into long: " + bytes.toString();
+          LOG.error(msg);
+          throw new IOException(msg, e);
+        }
+      default:
+        LOG.error("Could not convert " + o + " of class " + o.getClass() + " into long.");
+        throw new IOException("Could not convert " + o.getClass() + " into long");
+        
+    }
+  }
+  
+  protected Text objToText(Object o, byte type) throws IOException {
+    return new Text(objToBytes(o, type));
+  }
+  
+  @SuppressWarnings("unchecked")
+  protected byte[] objToBytes(Object o, byte type) throws IOException {
+    if (o == null)
+      return null;
+    switch (type) {
+      case DataType.BYTEARRAY:
+        return ((DataByteArray) o).get();
+      case DataType.BAG:
+        return caster.toBytes((DataBag) o);
+      case DataType.CHARARRAY:
+        return caster.toBytes((String) o);
+      case DataType.DOUBLE:
+        return caster.toBytes((Double) o);
+      case DataType.FLOAT:
+        return caster.toBytes((Float) o);
+      case DataType.INTEGER:
+        return caster.toBytes((Integer) o);
+      case DataType.LONG:
+        return caster.toBytes((Long) o);
+      case DataType.BIGINTEGER:
+        return caster.toBytes((BigInteger) o);
+      case DataType.BIGDECIMAL:
+        return caster.toBytes((BigDecimal) o);
+      case DataType.BOOLEAN:
+        return caster.toBytes((Boolean) o);
+      case DataType.DATETIME:
+        return caster.toBytes((DateTime) o);
+        
+        // The type conversion here is unchecked.
+        // Relying on DataType.findType to do the right thing.
+      case DataType.MAP:
+        return caster.toBytes((Map<String,Object>) o);
+        
+      case DataType.NULL:
+        return null;
+      case DataType.TUPLE:
+        return caster.toBytes((Tuple) o);
+      case DataType.ERROR:
+        throw new IOException("Unable to determine type of " + o.getClass());
+      default:
+        throw new IOException("Unable to find a converter for tuple field " + o);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/74c01ec2/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
index 8a17e8b..8462985 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
@@ -17,11 +17,8 @@
 package org.apache.accumulo.pig;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Map;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
@@ -30,18 +27,12 @@ import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
-import org.apache.pig.LoadStoreCaster;
-import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.Utf8StorageConverter;
-import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.util.ObjectSerializer;
-import org.joda.time.DateTime;
 
 /**
  * A LoadStoreFunc for retrieving data from and storing data to Accumulo.
@@ -60,9 +51,6 @@ import org.joda.time.DateTime;
  */
 public class AccumuloKVStorage extends AbstractAccumuloStorage {
   private static final Log LOG = LogFactory.getLog(AccumuloKVStorage.class);
-  protected LoadStoreCaster caster;
-  
-  private ResourceSchema schema;
   
   public AccumuloKVStorage() {
     this.caster = new Utf8StorageConverter();
@@ -129,142 +117,4 @@ public class AccumuloKVStorage extends AbstractAccumuloStorage {
     return Collections.singleton(mut);
   }
   
-  @Override
-  public void checkSchema(ResourceSchema s) throws IOException {
-    if (!(caster instanceof LoadStoreCaster)) {
-      LOG.error("Caster must implement LoadStoreCaster for writing to Accumulo.");
-      throw new IOException("Bad Caster " + caster.getClass());
-    }
-    schema = s;
-    getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema));
-  }
-  
-  private Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
-    Object o = tuple.get(i);
-    byte type = schemaToType(o, i, fieldSchemas);
-    
-    return objToText(o, type);
-  }
-  
-  private byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) {
-    return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
-  }
-  
-  private byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
-    Object o = tuple.get(i);
-    byte type = schemaToType(o, i, fieldSchemas);
-    
-    return objToBytes(o, type);
-    
-  }
-  
-  private long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException { 
-    Object o = tuple.get(i);
-    byte type = schemaToType(o, i, fieldSchemas);
-    
-    switch (type) {
-      case DataType.LONG:
-        return (Long) o;
-      case DataType.CHARARRAY:
-        String timestampString = (String) o;
-        try {
-          return Long.parseLong(timestampString);
-        } catch (NumberFormatException e) {
-          final String msg = "Could not cast chararray into long: " + timestampString;
-          LOG.error(msg);
-          throw new IOException(msg, e);
-        }
-      case DataType.DOUBLE:
-        Double doubleTimestamp = (Double) o;
-        return doubleTimestamp.longValue();
-      case DataType.FLOAT:
-        Float floatTimestamp = (Float) o;
-        return floatTimestamp.longValue();
-      case DataType.INTEGER:
-        Integer intTimestamp = (Integer) o;
-        return intTimestamp.longValue();
-      case DataType.BIGINTEGER:
-        BigInteger bigintTimestamp = (BigInteger) o;
-        long longTimestamp = bigintTimestamp.longValue();
-        
-        BigInteger recreatedTimestamp = BigInteger.valueOf(longTimestamp);
-        
-        if (!recreatedTimestamp.equals(bigintTimestamp)) {
-          LOG.warn("Downcasting BigInteger into Long results in a change of the original value. Was " + bigintTimestamp + " but is now " + longTimestamp);
-        }
-        
-        return longTimestamp;
-      case DataType.BIGDECIMAL:
-        BigDecimal bigdecimalTimestamp = (BigDecimal) o;
-        try {
-          return bigdecimalTimestamp.longValueExact();
-        } catch (ArithmeticException e) {
-          long convertedLong = bigdecimalTimestamp.longValue();
-          LOG.warn("Downcasting BigDecimal into Long results in a loss of information. Was " + bigdecimalTimestamp + " but is now " + convertedLong);
-          return convertedLong;
-        }
-      case DataType.BYTEARRAY:
-        DataByteArray bytes = (DataByteArray) o;
-        try {
-          return Long.parseLong(bytes.toString());
-        } catch (NumberFormatException e) {
-          final String msg = "Could not cast bytes into long: " + bytes.toString();
-          LOG.error(msg);
-          throw new IOException(msg, e);
-        }
-      default:
-        LOG.error("Could not convert " + o + " of class " + o.getClass() + " into long.");
-        throw new IOException("Could not convert " + o.getClass() + " into long");
-        
-    }
-  }
-  
-  private Text objToText(Object o, byte type) throws IOException {
-    return new Text(objToBytes(o, type));
-  }
-  
-  @SuppressWarnings("unchecked")
-  private byte[] objToBytes(Object o, byte type) throws IOException {
-    if (o == null)
-      return null;
-    switch (type) {
-      case DataType.BYTEARRAY:
-        return ((DataByteArray) o).get();
-      case DataType.BAG:
-        return caster.toBytes((DataBag) o);
-      case DataType.CHARARRAY:
-        return caster.toBytes((String) o);
-      case DataType.DOUBLE:
-        return caster.toBytes((Double) o);
-      case DataType.FLOAT:
-        return caster.toBytes((Float) o);
-      case DataType.INTEGER:
-        return caster.toBytes((Integer) o);
-      case DataType.LONG:
-        return caster.toBytes((Long) o);
-      case DataType.BIGINTEGER:
-        return caster.toBytes((BigInteger) o);
-      case DataType.BIGDECIMAL:
-        return caster.toBytes((BigDecimal) o);
-      case DataType.BOOLEAN:
-        return caster.toBytes((Boolean) o);
-      case DataType.DATETIME:
-        return caster.toBytes((DateTime) o);
-        
-        // The type conversion here is unchecked.
-        // Relying on DataType.findType to do the right thing.
-      case DataType.MAP:
-        return caster.toBytes((Map<String,Object>) o);
-        
-      case DataType.NULL:
-        return null;
-      case DataType.TUPLE:
-        return caster.toBytes((Tuple) o);
-      case DataType.ERROR:
-        throw new IOException("Unable to determine type of " + o.getClass());
-      default:
-        throw new IOException("Unable to find a converter for tuple field " + o);
-    }
-  }
-  
 }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/74c01ec2/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
new file mode 100644
index 0000000..c72f07f
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -0,0 +1,134 @@
+package org.apache.accumulo.pig;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+import com.google.common.collect.Lists;
+
+public class AccumuloStorage extends AbstractAccumuloStorage {
+  private static final Logger log = Logger.getLogger(AccumuloStorage.class);
+  private static final String COMMA = ",", COLON = ":";
+  private static final Text EMPTY_TEXT = new Text(new byte[0]);
+  
+  protected final List<String> columnSpecs;
+  
+  public AccumuloStorage(String columns) {
+    if (!StringUtils.isBlank(columns)) {
+      String[] columnArray = StringUtils.split(columns, COMMA);
+      columnSpecs = Lists.newArrayList(columnArray);
+    } else {
+      columnSpecs = Collections.emptyList();
+    }
+  }
+  
+  @Override
+  protected Tuple getTuple(Key key, Value value) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+  
+  @Override
+  public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
+    final ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields();
+    
+    Iterator<Object> tupleIter = tuple.iterator();
+    
+    if (1 <= tuple.size()) {
+      log.debug("Ignoring tuple of size " + tuple.size());
+      return Collections.emptyList();
+    }
+    
+    Mutation mutation = new Mutation(objectToText(tupleIter.next(), (null == fieldSchemas) ? null : fieldSchemas[0]));
+    
+    // TODO Can these be lifted up to members of the class instead of this method?
+    final Text _cfHolder = new Text(), _cqHolder = new Text();
+    
+    int columnOffset = 0;
+    int tupleOffset = 1;
+    while (tupleIter.hasNext()) {
+      Object o = tupleIter.next();
+      String cf = null;
+      
+      // Figure out if the user provided a specific columnfamily to use.
+      if (columnOffset < columnSpecs.size()) {
+        cf = columnSpecs.get(columnOffset);
+      }
+      
+      // Grab the type for this field
+      byte type = schemaToType(o, fieldSchemas[tupleOffset]);
+      
+      // If we have a Map, we want to treat every Entry as a column in this record
+      // placing said column in the column family unless this instance of AccumuloStorage
+      // was provided a specific columnFamily to use, in which case the entry's column is
+      // in the column qualifier.
+      if (DataType.MAP == type) {
+        @SuppressWarnings("unchecked")
+        Map<String,Object> map = (Map<String,Object>) o;
+        
+        for (Entry<String,Object> entry : map.entrySet()) {
+          Object entryObject = entry.getValue();
+          byte entryType = DataType.findType(entryObject);
+          
+          Value value = new Value(objToBytes(entryObject, entryType));
+          
+          // If we have a CF, use it and push the Map's key down to the CQ
+          if (null != cf) {
+            _cfHolder.set(cf);
+            _cqHolder.set(entry.getKey());
+            
+            mutation.put(_cfHolder, _cqHolder, value);
+          } else {
+            // Just put the Map's key into the CF
+            _cfHolder.set(entry.getKey());
+            mutation.put(_cfHolder, EMPTY_TEXT, value);
+          }
+        }
+      } else if (null == cf) {
+        // We don't know what column to place the value into
+        log.warn("Was provided no column family for non-Map entry in the tuple at offset " + tupleOffset);
+      } else {
+        Value value = new Value(objToBytes(o, type));
+        
+        // We have something that isn't a Map, use the provided CF as a column name
+        // and then shove the value into the Value
+        int index = cf.indexOf(COLON);
+        if (-1 == index) {
+          _cfHolder.set(cf);
+          
+          mutation.put(_cfHolder, EMPTY_TEXT, value);
+        } else {
+          byte[] cfBytes = cf.getBytes(); 
+          _cfHolder.set(cfBytes, 0, index);
+          _cqHolder.set(cfBytes, index+1, cfBytes.length - (index + 1));
+          
+          mutation.put(_cfHolder, _cqHolder, value);
+        }
+      }
+      
+      columnOffset++;
+      tupleOffset++;
+    }
+    
+    if (0 == mutation.size()) {
+      return Collections.emptyList();
+    }
+    
+    return Collections.singletonList(mutation);
+  }
+}


[05/10] git commit: ACCUMULO-1783 Add in some tests for deserializing Key/Value back into Tuple, fixing some bugs along the way.

Posted by el...@apache.org.
ACCUMULO-1783 Add in some tests for deserializing Key/Value back into
Tuple, fixing some bugs along the way.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/8c46d9b3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/8c46d9b3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/8c46d9b3

Branch: refs/heads/ACCUMULO-1783
Commit: 8c46d9b3262e3568dd613f80569d1f11944cf262
Parents: e227ad4
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 24 19:55:01 2013 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 24 19:55:01 2013 -0700

----------------------------------------------------------------------
 .../apache/accumulo/pig/AccumuloStorage.java    |  11 +-
 .../accumulo/pig/AccumuloStorageTest.java       | 112 +++++++++++++++++++
 2 files changed, 120 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/8c46d9b3/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 3759181..030b0c3 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -67,6 +67,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
     while (iter.hasNext()) {
       if (null == currentEntry) {
         currentEntry = iter.next();
+        aggregate.add(currentEntry);
       } else {
         Entry<Key,Value> nextEntry = iter.next();
         
@@ -75,11 +76,14 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
           // Aggregate this entry into the map
           aggregate.add(nextEntry);
         } else {
+          currentEntry = nextEntry;
+          
           // Flush and start again
           InternalMap map = aggregate(aggregate);
           tupleEntries.add(map);
           
           aggregate = Lists.newLinkedList();
+          aggregate.add(currentEntry);
         }
       }
     }
@@ -92,7 +96,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
     Tuple tuple = TupleFactory.getInstance().newTuple(tupleEntries.size() + 1);
     tuple.set(0, new DataByteArray(key.getRow().getBytes()));
     int i = 1;
-    for (Object obj : tupleEntries)  {
+    for (Object obj : tupleEntries) {
       tuple.set(i, obj);
       i++;
     }
@@ -100,10 +104,11 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
     return tuple;
   }
   
-  private InternalMap aggregate(List<Entry<Key,Value>> columns)  {
+  private InternalMap aggregate(List<Entry<Key,Value>> columns) {
     InternalMap map = new InternalMap();
     for (Entry<Key,Value> column : columns) {
-      map.put(column.getKey().getColumnQualifier().toString(), new DataByteArray(column.getValue().get()));
+      map.put(column.getKey().getColumnFamily().toString() + COLON + column.getKey().getColumnQualifier().toString(),
+          new DataByteArray(column.getValue().get()));
     }
     
     return map;

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/8c46d9b3/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
index 38d0260..10777a8 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
@@ -8,12 +8,18 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.InternalMap;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class AccumuloStorageTest {
@@ -294,4 +300,110 @@ public class AccumuloStorageTest {
     Assert.assertTrue("Did not find all expectations", expectations.isEmpty());
   }
   
+  @Test
+  public void testSingleKey() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage();
+    
+    List<Key> keys = Lists.newArrayList();
+    List<Value> values = Lists.newArrayList();
+    
+    keys.add(new Key("1", "", "col1"));
+    values.add(new Value("value1".getBytes()));
+    
+    Key k = new Key("1");
+    Value v = WholeRowIterator.encodeRow(keys, values);
+    
+    Tuple t = storage.getTuple(k, v);
+    
+    Assert.assertEquals(2, t.size());
+    
+    Assert.assertEquals("1", t.get(0).toString());
+    
+    InternalMap map = new InternalMap();
+    map.put(":col1", new DataByteArray("value1"));
+    
+    Assert.assertEquals(map, t.get(1));
+  }
+  
+  @Test
+  public void testSingleColumn() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage();
+    
+    List<Key> keys = Lists.newArrayList();
+    List<Value> values = Lists.newArrayList();
+    
+    keys.add(new Key("1", "col1", "cq1"));
+    keys.add(new Key("1", "col1", "cq2"));
+    keys.add(new Key("1", "col1", "cq3"));
+    
+    values.add(new Value("value1".getBytes()));
+    values.add(new Value("value2".getBytes()));
+    values.add(new Value("value3".getBytes()));
+    
+    Key k = new Key("1");
+    Value v = WholeRowIterator.encodeRow(keys, values);
+    
+    Tuple t = storage.getTuple(k, v);
+    
+    Assert.assertEquals(2, t.size());
+    
+    Assert.assertEquals("1", t.get(0).toString());
+    
+    InternalMap map = new InternalMap();
+    map.put("col1:cq1", new DataByteArray("value1"));
+    map.put("col1:cq2", new DataByteArray("value2"));
+    map.put("col1:cq3", new DataByteArray("value3"));
+    
+    Assert.assertEquals(map, t.get(1));
+  }
+  
+  @Test
+  public void testMultipleColumns() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage();
+    
+    List<Key> keys = Lists.newArrayList();
+    List<Value> values = Lists.newArrayList();
+    
+    keys.add(new Key("1", "col1", "cq1"));
+    keys.add(new Key("1", "col1", "cq2"));
+    keys.add(new Key("1", "col1", "cq3"));
+    keys.add(new Key("1", "col2", "cq1"));
+    keys.add(new Key("1", "col3", "cq1"));
+    keys.add(new Key("1", "col3", "cq2"));
+    
+    values.add(new Value("value1".getBytes()));
+    values.add(new Value("value2".getBytes()));
+    values.add(new Value("value3".getBytes()));
+    values.add(new Value("value1".getBytes()));
+    values.add(new Value("value1".getBytes()));
+    values.add(new Value("value2".getBytes()));
+    
+    Key k = new Key("1");
+    Value v = WholeRowIterator.encodeRow(keys, values);
+    
+    Tuple t = storage.getTuple(k, v);
+    
+    Assert.assertEquals(4, t.size());
+    
+    Assert.assertEquals("1", t.get(0).toString());
+    
+    InternalMap map = new InternalMap();
+    map.put("col1:cq1", new DataByteArray("value1"));
+    map.put("col1:cq2", new DataByteArray("value2"));
+    map.put("col1:cq3", new DataByteArray("value3"));
+    
+    Assert.assertEquals(map, t.get(1));
+    
+    map = new InternalMap();
+    map.put("col2:cq1", new DataByteArray("value1"));
+    
+    Assert.assertEquals(map, t.get(2));
+    
+    map = new InternalMap();
+    map.put("col3:cq1", new DataByteArray("value1"));
+    map.put("col3:cq2", new DataByteArray("value2"));
+    
+    Assert.assertEquals(map, t.get(3));
+  }
+  
 }


[04/10] git commit: ACCUMULO-1783 Update tests for new functionality and API changes

Posted by el...@apache.org.
ACCUMULO-1783 Update tests for new functionality and API changes


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/e227ad4a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/e227ad4a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/e227ad4a

Branch: refs/heads/ACCUMULO-1783
Commit: e227ad4ab6d4277644e2aecefb1e2c8dc3db22cb
Parents: 58e5a7e
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 24 19:31:00 2013 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 24 19:31:00 2013 -0700

----------------------------------------------------------------------
 .../accumulo/pig/AbstractAccumuloStorage.java   |   2 +-
 .../apache/accumulo/pig/AccumuloStorage.java    |  40 ++-
 .../accumulo/pig/AccumuloStorageTest.java       | 297 +++++++++++++++++++
 3 files changed, 322 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/e227ad4a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 494fd72..2361dcf 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -208,7 +208,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
       AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
       AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
       if (columnFamilyColumnQualifierPairs.size() > 0) {
-        LOG.info("columns: " + columnFamilyColumnQualifierPairs);
+        LOG.debug("columns: " + columnFamilyColumnQualifierPairs);
         AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
       }
       

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/e227ad4a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 9c8f002..3759181 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.Utf8StorageConverter;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.InternalMap;
@@ -38,7 +39,13 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
   
   protected final List<String> columnSpecs;
   
+  public AccumuloStorage() {
+    this("");
+  }
+  
   public AccumuloStorage(String columns) {
+    this.caster = new Utf8StorageConverter();
+    
     if (!StringUtils.isBlank(columns)) {
       String[] columnArray = StringUtils.split(columns, COMMA);
       columnSpecs = Lists.newArrayList(columnArray);
@@ -51,7 +58,6 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
   protected Tuple getTuple(Key key, Value value) throws IOException {
     
     SortedMap<Key,Value> rowKVs = WholeRowIterator.decodeRow(key, value);
-    List<Tuple> columns = new ArrayList<Tuple>(rowKVs.size());
     
     List<Object> tupleEntries = Lists.newLinkedList();
     Iterator<Entry<Key,Value>> iter = rowKVs.entrySet().iterator();
@@ -103,16 +109,6 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
     return map;
   }
   
-  private Tuple columnToTuple(Text colfam, Text colqual, Text colvis, long ts, Value val) throws IOException {
-    Tuple tuple = TupleFactory.getInstance().newTuple(5);
-    tuple.set(0, new DataByteArray(colfam.getBytes()));
-    tuple.set(1, new DataByteArray(colqual.getBytes()));
-    tuple.set(2, new DataByteArray(colvis.getBytes()));
-    tuple.set(3, new Long(ts));
-    tuple.set(4, new DataByteArray(val.get()));
-    return tuple;
-  }
-  
   protected void configureInputFormat(Configuration conf) {
     AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
   }
@@ -123,7 +119,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
     
     Iterator<Object> tupleIter = tuple.iterator();
     
-    if (1 <= tuple.size()) {
+    if (1 >= tuple.size()) {
       log.debug("Ignoring tuple of size " + tuple.size());
       return Collections.emptyList();
     }
@@ -146,7 +142,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
       }
       
       // Grab the type for this field
-      byte type = schemaToType(o, fieldSchemas[tupleOffset]);
+      byte type = schemaToType(o, (null == fieldSchemas) ? null : fieldSchemas[tupleOffset]);
       
       // If we have a Map, we want to treat every Entry as a column in this record
       // placing said column in the column family unless this instance of AccumuloStorage
@@ -164,10 +160,22 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
           
           // If we have a CF, use it and push the Map's key down to the CQ
           if (null != cf) {
-            _cfHolder.set(cf);
-            _cqHolder.set(entry.getKey());
+            int index = cf.indexOf(COLON);
             
-            mutation.put(_cfHolder, _cqHolder, value);
+            // No colon in the provided column
+            if (-1 == index) {
+              _cfHolder.set(cf);
+              _cqHolder.set(entry.getKey());
+              
+              mutation.put(_cfHolder, _cqHolder, value);
+            } else {
+              _cfHolder.set(cf.getBytes(), 0, index);
+              
+              _cqHolder.set(cf.getBytes(), index + 1, cf.length() - (index + 1));
+              _cqHolder.append(entry.getKey().getBytes(), 0, entry.getKey().length());
+              
+              mutation.put(_cfHolder, _cqHolder, value);
+            }
           } else {
             // Just put the Map's key into the CQ
             _cqHolder.set(entry.getKey());

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/e227ad4a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
new file mode 100644
index 0000000..38d0260
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
@@ -0,0 +1,297 @@
+package org.apache.accumulo.pig;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class AccumuloStorageTest {
+  
+  @Test
+  public void test1Tuple() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage();
+    
+    Tuple t = TupleFactory.getInstance().newTuple(1);
+    t.set(0, "row");
+    
+    Collection<Mutation> mutations = storage.getMutations(t);
+    
+    Assert.assertEquals(0, mutations.size());
+  }
+  
+  @Test
+  public void test2TupleNoColumn() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage();
+    
+    Tuple t = TupleFactory.getInstance().newTuple(2);
+    t.set(0, "row");
+    t.set(1, "value");
+    
+    Assert.assertEquals(0, storage.getMutations(t).size());
+  }
+  
+  @Test
+  public void test2TupleWithColumn() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage("col");
+    
+    Tuple t = TupleFactory.getInstance().newTuple(2);
+    t.set(0, "row");
+    t.set(1, "value");
+    
+    Collection<Mutation> mutations = storage.getMutations(t);
+    
+    Assert.assertEquals(1, mutations.size());
+    
+    Mutation m = mutations.iterator().next();
+    
+    Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+    
+    List<ColumnUpdate> colUpdates = m.getUpdates();
+    Assert.assertEquals(1, colUpdates.size());
+    
+    ColumnUpdate colUpdate = colUpdates.get(0);
+    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
+    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col".getBytes()));
+    Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value".getBytes()));
+  }
+  
+  @Test
+  public void test2TupleWithColumnQual() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage("col:qual");
+    
+    Tuple t = TupleFactory.getInstance().newTuple(2);
+    t.set(0, "row");
+    t.set(1, "value");
+    
+    Collection<Mutation> mutations = storage.getMutations(t);
+    
+    Assert.assertEquals(1, mutations.size());
+    
+    Mutation m = mutations.iterator().next();
+    
+    Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+    
+    List<ColumnUpdate> colUpdates = m.getUpdates();
+    Assert.assertEquals(1, colUpdates.size());
+    
+    ColumnUpdate colUpdate = colUpdates.get(0);
+    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col".getBytes()));
+    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "qual".getBytes()));
+    Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value".getBytes()));
+  }
+  
+  @Test
+  public void test2TupleWithMixedColumns() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage("col1,col1:qual,col2:qual,col2");
+    
+    Tuple t = TupleFactory.getInstance().newTuple(5);
+    t.set(0, "row");
+    t.set(1, "value1");
+    t.set(2, "value2");
+    t.set(3, "value3");
+    t.set(4, "value4");
+    
+    Collection<Mutation> mutations = storage.getMutations(t);
+    
+    Assert.assertEquals(1, mutations.size());
+    
+    Mutation m = mutations.iterator().next();
+    
+    Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+    
+    List<ColumnUpdate> colUpdates = m.getUpdates();
+    Assert.assertEquals(4, colUpdates.size());
+    
+    ColumnUpdate colUpdate = colUpdates.get(0);
+    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
+    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col1".getBytes()));
+    Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value1".getBytes()));
+    
+    colUpdate = colUpdates.get(1);
+    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col1".getBytes()));
+    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "qual".getBytes()));
+    Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value2".getBytes()));
+    
+    colUpdate = colUpdates.get(2);
+    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col2".getBytes()));
+    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "qual".getBytes()));
+    Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value3".getBytes()));
+    
+    colUpdate = colUpdates.get(3);
+    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
+    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col2".getBytes()));
+    Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value4".getBytes()));
+  }
+  
+  @Test
+  public void testIgnoredExtraColumns() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage("col");
+    
+    Tuple t = TupleFactory.getInstance().newTuple(3);
+    t.set(0, "row");
+    t.set(1, "value1");
+    t.set(2, "value2");
+    
+    Collection<Mutation> mutations = storage.getMutations(t);
+    
+    Assert.assertEquals(1, mutations.size());
+    
+    Mutation m = mutations.iterator().next();
+    
+    Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+    
+    List<ColumnUpdate> colUpdates = m.getUpdates();
+    Assert.assertEquals(1, colUpdates.size());
+    
+    ColumnUpdate colUpdate = colUpdates.get(0);
+    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
+    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col".getBytes()));
+    Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value1".getBytes()));
+  }
+  
+  @Test
+  public void testNonIgnoredExtraAsMap() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage("col");
+    
+    Map<String,Object> map = Maps.newHashMap();
+    
+    map.put("mapcol1", "mapval1");
+    map.put("mapcol2", "mapval2");
+    map.put("mapcol3", "mapval3");
+    map.put("mapcol4", "mapval4");
+    
+    Tuple t = TupleFactory.getInstance().newTuple(3);
+    t.set(0, "row");
+    t.set(1, "value1");
+    t.set(2, map);
+    
+    Collection<Mutation> mutations = storage.getMutations(t);
+    
+    Assert.assertEquals(1, mutations.size());
+    
+    Mutation m = mutations.iterator().next();
+    
+    Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+    
+    List<ColumnUpdate> colUpdates = m.getUpdates();
+    Assert.assertEquals(5, colUpdates.size());
+    
+    Map<Entry<String,String>,String> expectations = Maps.newHashMap();
+    expectations.put(Maps.immutableEntry("", "col"), "value1");
+    expectations.put(Maps.immutableEntry("", "mapcol1"), "mapval1");
+    expectations.put(Maps.immutableEntry("", "mapcol2"), "mapval2");
+    expectations.put(Maps.immutableEntry("", "mapcol3"), "mapval3");
+    expectations.put(Maps.immutableEntry("", "mapcol4"), "mapval4");
+    
+    for (ColumnUpdate update : colUpdates) {
+      Entry<String,String> key = Maps.immutableEntry(new String(update.getColumnFamily()), new String(update.getColumnQualifier()));
+      String value = new String(update.getValue());
+      Assert.assertTrue(expectations.containsKey(key));
+     
+      String actual = expectations.remove(key);
+      Assert.assertEquals(value, actual);
+    }
+    
+    Assert.assertTrue("Did not find all expectations", expectations.isEmpty());
+  }
+  
+  @Test
+  public void testMapWithColFam() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage("col");
+    
+    Map<String,Object> map = Maps.newHashMap();
+    
+    map.put("mapcol1", "mapval1");
+    map.put("mapcol2", "mapval2");
+    map.put("mapcol3", "mapval3");
+    map.put("mapcol4", "mapval4");
+    
+    Tuple t = TupleFactory.getInstance().newTuple(2);
+    t.set(0, "row");
+    t.set(1, map);
+    
+    Collection<Mutation> mutations = storage.getMutations(t);
+    
+    Assert.assertEquals(1, mutations.size());
+    
+    Mutation m = mutations.iterator().next();
+    
+    Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+    
+    List<ColumnUpdate> colUpdates = m.getUpdates();
+    Assert.assertEquals(4, colUpdates.size());
+    
+    Map<Entry<String,String>,String> expectations = Maps.newHashMap();
+    expectations.put(Maps.immutableEntry("col", "mapcol1"), "mapval1");
+    expectations.put(Maps.immutableEntry("col", "mapcol2"), "mapval2");
+    expectations.put(Maps.immutableEntry("col", "mapcol3"), "mapval3");
+    expectations.put(Maps.immutableEntry("col", "mapcol4"), "mapval4");
+    
+    for (ColumnUpdate update : colUpdates) {
+      Entry<String,String> key = Maps.immutableEntry(new String(update.getColumnFamily()), new String(update.getColumnQualifier()));
+      String value = new String(update.getValue());
+      Assert.assertTrue(expectations.containsKey(key));
+     
+      String actual = expectations.remove(key);
+      Assert.assertEquals(value, actual);
+    }
+    
+    Assert.assertTrue("Did not find all expectations", expectations.isEmpty());
+  }
+  
+  @Test
+  public void testMapWithColFamColQualPrefix() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage("col:qual_");
+    
+    Map<String,Object> map = Maps.newHashMap();
+    
+    map.put("mapcol1", "mapval1");
+    map.put("mapcol2", "mapval2");
+    map.put("mapcol3", "mapval3");
+    map.put("mapcol4", "mapval4");
+    
+    Tuple t = TupleFactory.getInstance().newTuple(2);
+    t.set(0, "row");
+    t.set(1, map);
+    
+    Collection<Mutation> mutations = storage.getMutations(t);
+    
+    Assert.assertEquals(1, mutations.size());
+    
+    Mutation m = mutations.iterator().next();
+    
+    Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+    
+    List<ColumnUpdate> colUpdates = m.getUpdates();
+    Assert.assertEquals(4, colUpdates.size());
+    
+    Map<Entry<String,String>,String> expectations = Maps.newHashMap();
+    expectations.put(Maps.immutableEntry("col", "qual_mapcol1"), "mapval1");
+    expectations.put(Maps.immutableEntry("col", "qual_mapcol2"), "mapval2");
+    expectations.put(Maps.immutableEntry("col", "qual_mapcol3"), "mapval3");
+    expectations.put(Maps.immutableEntry("col", "qual_mapcol4"), "mapval4");
+    
+    for (ColumnUpdate update : colUpdates) {
+      Entry<String,String> key = Maps.immutableEntry(new String(update.getColumnFamily()), new String(update.getColumnQualifier()));
+      String value = new String(update.getValue());
+      Assert.assertTrue(expectations.containsKey(key));
+     
+      String actual = expectations.remove(key);
+      Assert.assertEquals(value, actual);
+    }
+    
+    Assert.assertTrue("Did not find all expectations", expectations.isEmpty());
+  }
+  
+}


[07/10] git commit: ACCUMULO-1783 Add some additional logging on AIF setup

Posted by el...@apache.org.
ACCUMULO-1783 Add some additional logging on AIF setup


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/291b9392
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/291b9392
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/291b9392

Branch: refs/heads/ACCUMULO-1783
Commit: 291b9392ae53b4e3943869e7b8041dc98560353b
Parents: ad03c51
Author: Josh Elser <el...@apache.org>
Authored: Wed Oct 30 22:16:35 2013 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Oct 30 22:16:35 2013 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/pig/AbstractAccumuloStorage.java    | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/291b9392/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index c2345cc..37efe84 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -208,11 +208,16 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
       AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
       AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
       if (columnFamilyColumnQualifierPairs.size() > 0) {
-        LOG.debug("columns: " + columnFamilyColumnQualifierPairs);
+        LOG.info("columns: " + columnFamilyColumnQualifierPairs);
         AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
       }
       
-      AccumuloInputFormat.setRanges(conf, Collections.singleton(new Range(start, end)));
+      Collection<Range> ranges = Collections.singleton(new Range(start, end));
+      
+      LOG.info("Scanning Accumulo for " + ranges);
+      
+      AccumuloInputFormat.setRanges(conf, ranges);
+      
       configureInputFormat(conf);
     }
   }