You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2014/08/20 00:41:13 UTC

svn commit: r1619005 [3/9] - in /hive/trunk: ./ accumulo-handler/ accumulo-handler/src/ accumulo-handler/src/java/ accumulo-handler/src/java/org/ accumulo-handler/src/java/org/apache/ accumulo-handler/src/java/org/apache/hadoop/ accumulo-handler/src/ja...

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.predicate;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.accumulo.core.data.Range;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloRowIdColumnMapping;
+import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp;
+import org.apache.hadoop.hive.accumulo.predicate.compare.Equal;
+import org.apache.hadoop.hive.accumulo.predicate.compare.GreaterThan;
+import org.apache.hadoop.hive.accumulo.predicate.compare.GreaterThanOrEqual;
+import org.apache.hadoop.hive.accumulo.predicate.compare.LessThan;
+import org.apache.hadoop.hive.accumulo.predicate.compare.LessThanOrEqual;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantFloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantLongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantShortObjectInspector;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class AccumuloRangeGenerator implements NodeProcessor {
+  private static final Logger log = LoggerFactory.getLogger(AccumuloRangeGenerator.class);
+
+  private final AccumuloPredicateHandler predicateHandler;
+  private final HiveAccumuloRowIdColumnMapping rowIdMapping;
+  private final String hiveRowIdColumnName;
+
+  public AccumuloRangeGenerator(AccumuloPredicateHandler predicateHandler,
+      HiveAccumuloRowIdColumnMapping rowIdMapping, String hiveRowIdColumnName) {
+    this.predicateHandler = predicateHandler;
+    this.rowIdMapping = rowIdMapping;
+    this.hiveRowIdColumnName = hiveRowIdColumnName;
+  }
+
+  @Override
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs)
+      throws SemanticException {
+    // If it's not some operator, pass it back
+    if (!(nd instanceof ExprNodeGenericFuncDesc)) {
+      return nd;
+    }
+
+    ExprNodeGenericFuncDesc func = (ExprNodeGenericFuncDesc) nd;
+
+    // 'and' nodes need to be intersected
+    if (FunctionRegistry.isOpAnd(func)) {
+      return processAndOpNode(nd, nodeOutputs);
+      // 'or' nodes need to be merged
+    } else if (FunctionRegistry.isOpOr(func)) {
+      return processOrOpNode(nd, nodeOutputs);
+    } else if (FunctionRegistry.isOpNot(func)) {
+      // TODO handle negations
+      throw new IllegalArgumentException("Negations not yet implemented");
+    } else {
+      return processExpression(func, nodeOutputs);
+    }
+  }
+
+  protected Object processAndOpNode(Node nd, Object[] nodeOutputs) {
+    // We might have multiple ranges coming from children
+    List<Range> andRanges = null;
+
+    for (Object nodeOutput : nodeOutputs) {
+      // null signifies nodes that are irrelevant to the generation
+      // of Accumulo Ranges
+      if (null == nodeOutput) {
+        continue;
+      }
+
+      // When an AND has no children (some conjunction over a field that isn't the column
+      // mapped to the Accumulo rowid) and when a conjunction generates Ranges which are empty
+      // (the children of the conjunction are disjoint), these two cases need to be kept separate.
+      //
+      // A null `andRanges` implies that ranges couldn't be computed, while an empty List
+      // of Ranges implies that there are no possible Ranges to lookup.
+      if (null == andRanges) {
+        andRanges = new ArrayList<Range>();
+      }
+
+      // The child is a single Range
+      if (nodeOutput instanceof Range) {
+        Range childRange = (Range) nodeOutput;
+
+        // No existing ranges, just accept the current
+        if (andRanges.isEmpty()) {
+          andRanges.add(childRange);
+        } else {
+          // For each range we have, intersect them. If they don't overlap
+          // the range can be discarded
+          List<Range> newRanges = new ArrayList<Range>();
+          for (Range andRange : andRanges) {
+            Range intersectedRange = andRange.clip(childRange, true);
+            if (null != intersectedRange) {
+              newRanges.add(intersectedRange);
+            }
+          }
+
+          // Set the newly-constructed ranges as the current state
+          andRanges = newRanges;
+        }
+      } else if (nodeOutput instanceof List) {
+        @SuppressWarnings("unchecked")
+        List<Range> childRanges = (List<Range>) nodeOutput;
+
+        // No ranges, use the ranges from the child
+        if (andRanges.isEmpty()) {
+          andRanges.addAll(childRanges);
+        } else {
+          List<Range> newRanges = new ArrayList<Range>();
+
+          // Cartesian product of our ranges, to the child ranges
+          for (Range andRange : andRanges) {
+            for (Range childRange : childRanges) {
+              Range intersectedRange = andRange.clip(childRange, true);
+
+              // Retain only valid intersections (discard disjoint ranges)
+              if (null != intersectedRange) {
+                newRanges.add(intersectedRange);
+              }
+            }
+          }
+
+          // Set the newly-constructed ranges as the current state
+          andRanges = newRanges;
+        }
+      } else {
+        log.error("Expected Range from {} but got {}", nd, nodeOutput);
+        throw new IllegalArgumentException("Expected Range but got "
+            + nodeOutput.getClass().getName());
+      }
+    }
+
+    return andRanges;
+  }
+
+  protected Object processOrOpNode(Node nd, Object[] nodeOutputs) {
+    List<Range> orRanges = new ArrayList<Range>(nodeOutputs.length);
+    for (Object nodeOutput : nodeOutputs) {
+      if (nodeOutput instanceof Range) {
+        orRanges.add((Range) nodeOutput);
+      } else if (nodeOutput instanceof List) {
+        @SuppressWarnings("unchecked")
+        List<Range> childRanges = (List<Range>) nodeOutput;
+        orRanges.addAll(childRanges);
+      } else {
+        log.error("Expected Range from " + nd + " but got " + nodeOutput);
+        throw new IllegalArgumentException("Expected Range but got "
+            + nodeOutput.getClass().getName());
+      }
+    }
+
+    // Try to merge multiple ranges together
+    if (orRanges.size() > 1) {
+      return Range.mergeOverlapping(orRanges);
+    } else if (1 == orRanges.size()) {
+      // Return just the single Range
+      return orRanges.get(0);
+    } else {
+      // No ranges, just return the empty list
+      return orRanges;
+    }
+  }
+
+  protected Object processExpression(ExprNodeGenericFuncDesc func, Object[] nodeOutputs)
+      throws SemanticException {
+    // a binary operator (gt, lt, ge, le, eq, ne)
+    GenericUDF genericUdf = func.getGenericUDF();
+
+    // Find the argument to the operator which is a constant
+    ExprNodeConstantDesc constantDesc = null;
+    ExprNodeColumnDesc columnDesc = null;
+    ExprNodeDesc leftHandNode = null;
+    for (Object nodeOutput : nodeOutputs) {
+      if (nodeOutput instanceof ExprNodeConstantDesc) {
+        // Ordering of constant and column in expression is important in correct range generation
+        if (null == leftHandNode) {
+          leftHandNode = (ExprNodeDesc) nodeOutput;
+        }
+
+        constantDesc = (ExprNodeConstantDesc) nodeOutput;
+      } else if (nodeOutput instanceof ExprNodeColumnDesc) {
+        // Ordering of constant and column in expression is important in correct range generation
+        if (null == leftHandNode) {
+          leftHandNode = (ExprNodeDesc) nodeOutput;
+        }
+
+        columnDesc = (ExprNodeColumnDesc) nodeOutput;
+      }
+    }
+
+    // If it's constant = constant or column = column, we can't fetch any ranges
+    // TODO We can try to be smarter and push up the value to some node which
+    // we can generate ranges from e.g. rowid > (4 + 5)
+    if (null == constantDesc || null == columnDesc) {
+      return null;
+    }
+
+    // Reject any clauses that are against a column that isn't the rowId mapping
+    if (!this.hiveRowIdColumnName.equals(columnDesc.getColumn())) {
+      return null;
+    }
+
+    ConstantObjectInspector objInspector = constantDesc.getWritableObjectInspector();
+
+    Text constText;
+    switch (rowIdMapping.getEncoding()) {
+      case STRING:
+        constText = getUtf8Value(objInspector);
+        break;
+      case BINARY:
+        try {
+          constText = getBinaryValue(objInspector);
+        } catch (IOException e) {
+          throw new SemanticException(e);
+        }
+        break;
+      default:
+        throw new SemanticException("Unable to parse unknown encoding: "
+            + rowIdMapping.getEncoding());
+    }
+
+    Class<? extends CompareOp> opClz;
+    try {
+      opClz = predicateHandler.getCompareOpClass(genericUdf.getUdfName());
+    } catch (NoSuchCompareOpException e) {
+      throw new IllegalArgumentException("Unhandled UDF class: " + genericUdf.getUdfName());
+    }
+
+    if (leftHandNode instanceof ExprNodeConstantDesc) {
+      return getConstantOpColumnRange(opClz, constText);
+    } else if (leftHandNode instanceof ExprNodeColumnDesc) {
+      return getColumnOpConstantRange(opClz, constText);
+    } else {
+      throw new IllegalStateException("Expected column or constant on LHS of expression");
+    }
+  }
+
+  protected Range getConstantOpColumnRange(Class<? extends CompareOp> opClz, Text constText) {
+    if (opClz.equals(Equal.class)) {
+      // 100 == x
+      return new Range(constText); // single row
+    } else if (opClz.equals(GreaterThanOrEqual.class)) {
+      // 100 >= x
+      return new Range(null, constText); // neg-infinity to end inclusive
+    } else if (opClz.equals(GreaterThan.class)) {
+      // 100 > x
+      return new Range(null, false, constText, false); // neg-infinity to end exclusive
+    } else if (opClz.equals(LessThanOrEqual.class)) {
+      // 100 <= x
+      return new Range(constText, true, null, false); // start inclusive to infinity
+    } else if (opClz.equals(LessThan.class)) {
+      // 100 < x
+      return new Range(constText, false, null, false); // start exclusive to infinity
+    } else {
+      throw new IllegalArgumentException("Could not process " + opClz);
+    }
+  }
+
+  protected Range getColumnOpConstantRange(Class<? extends CompareOp> opClz, Text constText) {
+    if (opClz.equals(Equal.class)) {
+      return new Range(constText); // start inclusive to end inclusive
+    } else if (opClz.equals(GreaterThanOrEqual.class)) {
+      return new Range(constText, null); // start inclusive to infinity inclusive
+    } else if (opClz.equals(GreaterThan.class)) {
+      return new Range(constText, false, null, false); // start exclusive to infinity inclusive
+    } else if (opClz.equals(LessThanOrEqual.class)) {
+      return new Range(null, false, constText, true); // neg-infinity to start inclusive
+    } else if (opClz.equals(LessThan.class)) {
+      return new Range(null, false, constText, false); // neg-infinity to start exclusive
+    } else {
+      throw new IllegalArgumentException("Could not process " + opClz);
+    }
+  }
+
+  protected Text getUtf8Value(ConstantObjectInspector objInspector) {
+    // TODO is there a more correct way to get the literal value for the Object?
+    return new Text(objInspector.getWritableConstantValue().toString());
+  }
+
+  /**
+   * Attempts to construct the binary value from the given inspector. Falls back to UTF8 encoding
+   * when the value cannot be coerced into binary.
+   *
+   * @return Binary value when possible, utf8 otherwise
+   * @throws IOException
+   */
+  protected Text getBinaryValue(ConstantObjectInspector objInspector) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    if (objInspector instanceof WritableConstantBooleanObjectInspector) {
+      LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+          (WritableConstantBooleanObjectInspector) objInspector);
+    } else if (objInspector instanceof WritableConstantByteObjectInspector) {
+      LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+          (WritableConstantByteObjectInspector) objInspector);
+    } else if (objInspector instanceof WritableConstantShortObjectInspector) {
+      LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+          (WritableConstantShortObjectInspector) objInspector);
+    } else if (objInspector instanceof WritableConstantIntObjectInspector) {
+      LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+          (WritableConstantIntObjectInspector) objInspector);
+    } else if (objInspector instanceof WritableConstantLongObjectInspector) {
+      LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+          (WritableConstantLongObjectInspector) objInspector);
+    } else if (objInspector instanceof WritableConstantDoubleObjectInspector) {
+      LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+          (WritableConstantDoubleObjectInspector) objInspector);
+    } else if (objInspector instanceof WritableConstantFloatObjectInspector) {
+      LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+          (WritableConstantDoubleObjectInspector) objInspector);
+    } else {
+      return getUtf8Value(objInspector);
+    }
+
+    out.close();
+    return new Text(out.toByteArray());
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchCompareOpException.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchCompareOpException.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchCompareOpException.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchCompareOpException.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.predicate;
+
+/**
+ *
+ */
+public class NoSuchCompareOpException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+  public NoSuchCompareOpException() {
+    super();
+  }
+
+  public NoSuchCompareOpException(String msg) {
+    super(msg);
+  }
+
+  public NoSuchCompareOpException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchPrimitiveComparisonException.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchPrimitiveComparisonException.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchPrimitiveComparisonException.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchPrimitiveComparisonException.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.predicate;
+
+import org.apache.hadoop.hive.accumulo.predicate.compare.PrimitiveComparison;
+
+/**
+ * Used when a {@link PrimitiveComparison} was specified but one with that name cannot be found
+ */
+public class NoSuchPrimitiveComparisonException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+  public NoSuchPrimitiveComparisonException() {
+    super();
+  }
+
+  public NoSuchPrimitiveComparisonException(String msg) {
+    super(msg);
+  }
+
+  public NoSuchPrimitiveComparisonException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,123 @@
+package org.apache.hadoop.hive.accumulo.predicate;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding;
+import org.apache.hadoop.hive.accumulo.columns.ColumnMappingFactory;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping;
+import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp;
+import org.apache.hadoop.hive.accumulo.predicate.compare.PrimitiveComparison;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Operates over a single qualifier.
+ *
+ * Delegates to PrimitiveCompare and CompareOpt instances for value acceptance.
+ *
+ * The PrimitiveCompare strategy assumes a consistent value type for the same column family and
+ * qualifier.
+ */
+public class PrimitiveComparisonFilter extends WholeRowIterator {
+  @SuppressWarnings("unused")
+  private static final Logger log = Logger.getLogger(PrimitiveComparisonFilter.class);
+
+  public static final String FILTER_PREFIX = "accumulo.filter.compare.iterator.";
+  public static final String P_COMPARE_CLASS = "accumulo.filter.iterator.p.compare.class";
+  public static final String COMPARE_OPT_CLASS = "accumulo.filter.iterator.compare.opt.class";
+  public static final String CONST_VAL = "accumulo.filter.iterator.const.val";
+  public static final String COLUMN = "accumulo.filter.iterator.qual";
+
+  private Text cfHolder, cqHolder, columnMappingFamily, columnMappingQualifier;
+  private HiveAccumuloColumnMapping columnMapping;
+  private CompareOp compOpt;
+
+  @Override
+  protected boolean filter(Text currentRow, List<Key> keys, List<Value> values) {
+    SortedMap<Key,Value> items;
+    boolean allow;
+    try { // if key doesn't contain CF, it's an encoded value from a previous iterator.
+      while (keys.get(0).getColumnFamily().getBytes().length == 0) {
+        items = decodeRow(keys.get(0), values.get(0));
+        keys = Lists.newArrayList(items.keySet());
+        values = Lists.newArrayList(items.values());
+      }
+      allow = accept(keys, values);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return allow;
+  }
+
+  private boolean accept(Collection<Key> keys, Collection<Value> values) {
+    Iterator<Key> kIter = keys.iterator();
+    Iterator<Value> vIter = values.iterator();
+    while (kIter.hasNext()) {
+      Key k = kIter.next();
+      Value v = vIter.next();
+      if (matchQualAndFam(k)) {
+        return compOpt.accept(v.get());
+      }
+    }
+    return false;
+  }
+
+  private boolean matchQualAndFam(Key k) {
+    k.getColumnFamily(cfHolder);
+    k.getColumnQualifier(cqHolder);
+    return cfHolder.equals(columnMappingFamily) && cqHolder.equals(columnMappingQualifier);
+  }
+
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+      IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    String serializedColumnMapping = options.get(COLUMN);
+    Entry<String,String> pair = ColumnMappingFactory.parseMapping(serializedColumnMapping);
+
+    // The ColumnEncoding, column name and type are all irrelevant at this point, just need the
+    // cf:[cq]
+    columnMapping = new HiveAccumuloColumnMapping(pair.getKey(), pair.getValue(),
+        ColumnEncoding.STRING, "column", "string");
+    columnMappingFamily = new Text(columnMapping.getColumnFamily());
+    columnMappingQualifier = new Text(columnMapping.getColumnQualifier());
+    cfHolder = new Text();
+    cqHolder = new Text();
+
+    try {
+      Class<?> pClass = Class.forName(options.get(P_COMPARE_CLASS));
+      Class<?> cClazz = Class.forName(options.get(COMPARE_OPT_CLASS));
+      PrimitiveComparison pCompare = pClass.asSubclass(PrimitiveComparison.class).newInstance();
+      compOpt = cClazz.asSubclass(CompareOp.class).newInstance();
+      byte[] constant = getConstant(options);
+      pCompare.init(constant);
+      compOpt.setPrimitiveCompare(pCompare);
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    } catch (InstantiationException e) {
+      throw new IOException(e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(e);
+    }
+  }
+
+  protected byte[] getConstant(Map<String,String> options) {
+    String b64Const = options.get(CONST_VAL);
+    return Base64.decodeBase64(b64Const.getBytes());
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PushdownTuple.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PushdownTuple.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PushdownTuple.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PushdownTuple.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.predicate;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp;
+import org.apache.hadoop.hive.accumulo.predicate.compare.DoubleCompare;
+import org.apache.hadoop.hive.accumulo.predicate.compare.IntCompare;
+import org.apache.hadoop.hive.accumulo.predicate.compare.LongCompare;
+import org.apache.hadoop.hive.accumulo.predicate.compare.PrimitiveComparison;
+import org.apache.hadoop.hive.accumulo.predicate.compare.StringCompare;
+import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator;
+import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * For use in IteratorSetting construction.
+ *
+ * encapsulates a constant byte [], PrimitiveCompare instance, and CompareOp instance.
+ */
+public class PushdownTuple {
+  private static final Logger log = Logger.getLogger(PushdownTuple.class);
+
+  private byte[] constVal;
+  private PrimitiveComparison pCompare;
+  private CompareOp cOpt;
+
+  public PushdownTuple(IndexSearchCondition sc, PrimitiveComparison pCompare, CompareOp cOpt)
+      throws SerDeException {
+    ExprNodeConstantEvaluator eval = new ExprNodeConstantEvaluator(sc.getConstantDesc());
+
+    try {
+      this.pCompare = pCompare;
+      this.cOpt = cOpt;
+      Writable writable = (Writable) eval.evaluate(null);
+      constVal = getConstantAsBytes(writable);
+    } catch (ClassCastException cce) {
+      log.info(StringUtils.stringifyException(cce));
+      throw new SerDeException(" Column type mismatch in where clause "
+          + sc.getComparisonExpr().getExprString() + " found type "
+          + sc.getConstantDesc().getTypeString() + " instead of "
+          + sc.getColumnDesc().getTypeString());
+    } catch (HiveException e) {
+      throw new SerDeException(e);
+    }
+  }
+
+  public byte[] getConstVal() {
+    return constVal;
+  }
+
+  public PrimitiveComparison getpCompare() {
+    return pCompare;
+  }
+
+  public CompareOp getcOpt() {
+    return cOpt;
+  }
+
+  /**
+   *
+   * @return byte [] value from writable.
+   * @throws SerDeException
+   */
+  public byte[] getConstantAsBytes(Writable writable) throws SerDeException {
+    if (pCompare instanceof StringCompare) {
+      return writable.toString().getBytes();
+    } else if (pCompare instanceof DoubleCompare) {
+      byte[] bts = new byte[8];
+      double val = ((DoubleWritable) writable).get();
+      ByteBuffer.wrap(bts).putDouble(val);
+      return bts;
+    } else if (pCompare instanceof IntCompare) {
+      byte[] bts = new byte[4];
+      int val = ((IntWritable) writable).get();
+      ByteBuffer.wrap(bts).putInt(val);
+      return bts;
+    } else if (pCompare instanceof LongCompare) {
+      byte[] bts = new byte[8];
+      long val = ((LongWritable) writable).get();
+      ByteBuffer.wrap(bts).putLong(val);
+      return bts;
+    } else {
+      throw new SerDeException("Unsupported primitive category: " + pCompare.getClass().getName());
+    }
+  }
+
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/CompareOp.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/CompareOp.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/CompareOp.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/CompareOp.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,26 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Handles different types of comparisons in hive predicates. Filter iterator delegates value
+ * acceptance to the CompareOpt.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}. Works with
+ * {@link PrimitiveComparison}
+ */
+public interface CompareOp {
+  /**
+   * Sets the PrimitiveComparison for this CompareOp
+   */
+  public void setPrimitiveCompare(PrimitiveComparison comp);
+
+  /**
+   * @return The PrimitiveComparison this CompareOp is a part of
+   */
+  public PrimitiveComparison getPrimitiveCompare();
+
+  /**
+   * @param val The bytes from the Accumulo Value
+   * @return true if the value is accepted by this CompareOp
+   */
+  public boolean accept(byte[] val);
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/DoubleCompare.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/DoubleCompare.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/DoubleCompare.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/DoubleCompare.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,90 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+/**
+ * Set of comparison operations over a double constant. Used for Hive predicates involving double
+ * comparison.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class DoubleCompare implements PrimitiveComparison {
+
+  private BigDecimal constant;
+
+  /**
+     *
+     */
+  public void init(byte[] constant) {
+    this.constant = serialize(constant);
+  }
+
+  /**
+   * @return BigDecimal holding double byte [] value
+   */
+  public BigDecimal serialize(byte[] value) {
+    try {
+      return new BigDecimal(ByteBuffer.wrap(value).asDoubleBuffer().get());
+    } catch (Exception e) {
+      throw new RuntimeException(e.toString() + " occurred trying to build double value. "
+          + "Make sure the value type for the byte[] is double.");
+    }
+  }
+
+  /**
+   * @return true if double value is equal to constant, false otherwise.
+   */
+  @Override
+  public boolean isEqual(byte[] value) {
+    return serialize(value).compareTo(constant) == 0;
+  }
+
+  /**
+   * @return true if double value not equal to constant, false otherwise.
+   */
+  @Override
+  public boolean isNotEqual(byte[] value) {
+    return serialize(value).compareTo(constant) != 0;
+  }
+
+  /**
+   * @return true if value greater than or equal to constant, false otherwise.
+   */
+  @Override
+  public boolean greaterThanOrEqual(byte[] value) {
+    return serialize(value).compareTo(constant) >= 0;
+  }
+
+  /**
+   * @return true if value greater than constant, false otherwise.
+   */
+  @Override
+  public boolean greaterThan(byte[] value) {
+    return serialize(value).compareTo(constant) > 0;
+  }
+
+  /**
+   * @return true if value less than or equal than constant, false otherwise.
+   */
+  @Override
+  public boolean lessThanOrEqual(byte[] value) {
+    return serialize(value).compareTo(constant) <= 0;
+  }
+
+  /**
+   * @return true if value less than constant, false otherwise.
+   */
+  @Override
+  public boolean lessThan(byte[] value) {
+    return serialize(value).compareTo(constant) < 0;
+  }
+
+  /**
+   * not supported for this PrimitiveCompare implementation.
+   */
+  @Override
+  public boolean like(byte[] value) {
+    throw new UnsupportedOperationException("Like not supported for " + getClass().getName());
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Equal.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Equal.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Equal.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Equal.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to isEqual() over PrimitiveCompare instance.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class Equal implements CompareOp {
+
+  private PrimitiveComparison comp;
+
+  public Equal() {}
+
+  public Equal(PrimitiveComparison comp) {
+    this.comp = comp;
+  }
+
+  @Override
+  public void setPrimitiveCompare(PrimitiveComparison comp) {
+    this.comp = comp;
+  }
+
+  @Override
+  public PrimitiveComparison getPrimitiveCompare() {
+    return comp;
+  }
+
+  @Override
+  public boolean accept(byte[] val) {
+    return comp.isEqual(val);
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThan.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThan.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThan.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThan.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to greaterThan over {@link PrimitiveComparison} instance.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class GreaterThan implements CompareOp {
+
+  private PrimitiveComparison comp;
+
+  public GreaterThan() {}
+
+  public GreaterThan(PrimitiveComparison comp) {
+    this.comp = comp;
+  }
+
+  @Override
+  public void setPrimitiveCompare(PrimitiveComparison comp) {
+    this.comp = comp;
+  }
+
+  @Override
+  public PrimitiveComparison getPrimitiveCompare() {
+    return this.comp;
+  }
+
+  @Override
+  public boolean accept(byte[] val) {
+    return comp.greaterThan(val);
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThanOrEqual.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThanOrEqual.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThanOrEqual.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThanOrEqual.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to greaterThanOrEqual over {@link PrimitiveComparison} instance.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class GreaterThanOrEqual implements CompareOp {
+
+  private PrimitiveComparison comp;
+
+  public GreaterThanOrEqual() {}
+
+  public GreaterThanOrEqual(PrimitiveComparison comp) {
+    this.comp = comp;
+  }
+
+  @Override
+  public void setPrimitiveCompare(PrimitiveComparison comp) {
+    this.comp = comp;
+  }
+
+  @Override
+  public PrimitiveComparison getPrimitiveCompare() {
+    return comp;
+  }
+
+  @Override
+  public boolean accept(byte[] val) {
+    return comp.greaterThanOrEqual(val);
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/IntCompare.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/IntCompare.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/IntCompare.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/IntCompare.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,63 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Set of comparison operations over a integer constant. Used for Hive predicates involving int
+ * comparison.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class IntCompare implements PrimitiveComparison {
+
+  private int constant;
+
+  @Override
+  public void init(byte[] constant) {
+    this.constant = serialize(constant);
+  }
+
+  @Override
+  public boolean isEqual(byte[] value) {
+    return serialize(value) == constant;
+  }
+
+  @Override
+  public boolean isNotEqual(byte[] value) {
+    return serialize(value) != constant;
+  }
+
+  @Override
+  public boolean greaterThanOrEqual(byte[] value) {
+    return serialize(value) >= constant;
+  }
+
+  @Override
+  public boolean greaterThan(byte[] value) {
+    return serialize(value) > constant;
+  }
+
+  @Override
+  public boolean lessThanOrEqual(byte[] value) {
+    return serialize(value) <= constant;
+  }
+
+  @Override
+  public boolean lessThan(byte[] value) {
+    return serialize(value) < constant;
+  }
+
+  @Override
+  public boolean like(byte[] value) {
+    throw new UnsupportedOperationException("Like not supported for " + getClass().getName());
+  }
+
+  public Integer serialize(byte[] value) {
+    try {
+      return ByteBuffer.wrap(value).asIntBuffer().get();
+    } catch (Exception e) {
+      throw new RuntimeException(e.toString() + " occurred trying to build int value. "
+          + "Make sure the value type for the byte[] is int ");
+    }
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThan.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThan.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThan.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThan.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to lessThan over {@link PrimitiveComparison} instance.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class LessThan implements CompareOp {
+
+  private PrimitiveComparison comp;
+
+  public LessThan() {}
+
+  public LessThan(PrimitiveComparison comp) {
+    this.comp = comp;
+  }
+
+  @Override
+  public void setPrimitiveCompare(PrimitiveComparison comp) {
+    this.comp = comp;
+  }
+
+  @Override
+  public PrimitiveComparison getPrimitiveCompare() {
+    return comp;
+  }
+
+  @Override
+  public boolean accept(byte[] val) {
+    return comp.lessThan(val);
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThanOrEqual.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThanOrEqual.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThanOrEqual.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThanOrEqual.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to lessThanOrEqual over {@link PrimitiveComparison} instance.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class LessThanOrEqual implements CompareOp {
+
+  private PrimitiveComparison comp;
+
+  public LessThanOrEqual() {}
+
+  public LessThanOrEqual(PrimitiveComparison comp) {
+    this.comp = comp;
+  }
+
+  @Override
+  public void setPrimitiveCompare(PrimitiveComparison comp) {
+    this.comp = comp;
+  }
+
+  @Override
+  public PrimitiveComparison getPrimitiveCompare() {
+    return comp;
+  }
+
+  @Override
+  public boolean accept(byte[] val) {
+    return comp.lessThanOrEqual(val);
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Like.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Like.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Like.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Like.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,33 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to like over {@link PrimitiveComparison} instance. Currently only supported by
+ * StringCompare.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class Like implements CompareOp {
+
+  PrimitiveComparison comp;
+
+  public Like() {}
+
+  public Like(PrimitiveComparison comp) {
+    this.comp = comp;
+  }
+
+  @Override
+  public void setPrimitiveCompare(PrimitiveComparison comp) {
+    this.comp = comp;
+  }
+
+  @Override
+  public PrimitiveComparison getPrimitiveCompare() {
+    return comp;
+  }
+
+  @Override
+  public boolean accept(byte[] val) {
+    return comp.like(val);
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LongCompare.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LongCompare.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LongCompare.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LongCompare.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,64 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Set of comparison operations over a long constant. Used for Hive predicates involving long
+ * comparison.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class LongCompare implements PrimitiveComparison {
+
+  private long constant;
+
+  @Override
+  public void init(byte[] constant) {
+    this.constant = serialize(constant);
+  }
+
+  @Override
+  public boolean isEqual(byte[] value) {
+    long lonVal = serialize(value);
+    return lonVal == constant;
+  }
+
+  @Override
+  public boolean isNotEqual(byte[] value) {
+    return serialize(value) != constant;
+  }
+
+  @Override
+  public boolean greaterThanOrEqual(byte[] value) {
+    return serialize(value) >= constant;
+  }
+
+  @Override
+  public boolean greaterThan(byte[] value) {
+    return serialize(value) > constant;
+  }
+
+  @Override
+  public boolean lessThanOrEqual(byte[] value) {
+    return serialize(value) <= constant;
+  }
+
+  @Override
+  public boolean lessThan(byte[] value) {
+    return serialize(value) < constant;
+  }
+
+  @Override
+  public boolean like(byte[] value) {
+    throw new UnsupportedOperationException("Like not supported for " + getClass().getName());
+  }
+
+  public Long serialize(byte[] value) {
+    try {
+      return ByteBuffer.wrap(value).asLongBuffer().get();
+    } catch (Exception e) {
+      throw new RuntimeException(e.toString() + " occurred trying to build long value. "
+          + "Make sure the value type for the byte[] is long ");
+    }
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/NotEqual.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/NotEqual.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/NotEqual.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/NotEqual.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to isEqual over {@link PrimitiveComparison} instance and returns the negation.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class NotEqual implements CompareOp {
+
+  private PrimitiveComparison comp;
+
+  public NotEqual() {}
+
+  public NotEqual(PrimitiveComparison comp) {
+    this.comp = comp;
+  }
+
+  @Override
+  public void setPrimitiveCompare(PrimitiveComparison comp) {
+    this.comp = comp;
+  }
+
+  @Override
+  public PrimitiveComparison getPrimitiveCompare() {
+    return comp;
+  }
+
+  @Override
+  public boolean accept(byte[] val) {
+    return !comp.isEqual(val);
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/PrimitiveComparison.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/PrimitiveComparison.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/PrimitiveComparison.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/PrimitiveComparison.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps type-specific comparison operations over a constant value. Methods take raw byte from
+ * incoming Accumulo values.
+ *
+ * The CompareOpt instance in the iterator uses one or more methods from a PrimitiveCompare
+ * implementation to perform type-specific comparisons and determine acceptances.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}. Works with
+ * {@link CompareOp}
+ */
+public interface PrimitiveComparison {
+
+  public boolean isEqual(byte[] value);
+
+  public boolean isNotEqual(byte[] value);
+
+  public boolean greaterThanOrEqual(byte[] value);
+
+  public boolean greaterThan(byte[] value);
+
+  public boolean lessThanOrEqual(byte[] value);
+
+  public boolean lessThan(byte[] value);
+
+  public boolean like(byte[] value);
+
+  public Object serialize(byte[] value);
+
+  public void init(byte[] constant);
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/StringCompare.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/StringCompare.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/StringCompare.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/StringCompare.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,65 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Set of comparison operations over a string constant. Used for Hive predicates involving string
+ * comparison.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class StringCompare implements PrimitiveComparison {
+  @SuppressWarnings("unused")
+  private static final Logger log = Logger.getLogger(StringCompare.class);
+
+  private String constant;
+
+  @Override
+  public void init(byte[] constant) {
+    this.constant = serialize(constant);
+  }
+
+  @Override
+  public boolean isEqual(byte[] value) {
+    return serialize(value).equals(constant);
+  }
+
+  @Override
+  public boolean isNotEqual(byte[] value) {
+    return !isEqual(value);
+  }
+
+  @Override
+  public boolean greaterThanOrEqual(byte[] value) {
+    return serialize(value).compareTo(constant) >= 0;
+  }
+
+  @Override
+  public boolean greaterThan(byte[] value) {
+    return serialize(value).compareTo(constant) > 0;
+  }
+
+  @Override
+  public boolean lessThanOrEqual(byte[] value) {
+    return serialize(value).compareTo(constant) <= 0;
+  }
+
+  @Override
+  public boolean lessThan(byte[] value) {
+    return serialize(value).compareTo(constant) < 0;
+  }
+
+  @Override
+  public boolean like(byte[] value) {
+    String temp = new String(value).replaceAll("%", "[\\\\\\w]+?");
+    Pattern pattern = Pattern.compile(temp);
+    boolean match = pattern.matcher(constant).matches();
+    return match;
+  }
+
+  public String serialize(byte[] value) {
+    return new String(value);
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/package-info.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/package-info.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/package-info.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/package-info.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,4 @@
+/**
+ * PrimitiveCompare and CompareOpt implementations for use in PrimitiveComparisonFilter iterator
+ */
+package org.apache.hadoop.hive.accumulo.predicate.compare;
\ No newline at end of file

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/package-info.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/package-info.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/package-info.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/package-info.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,4 @@
+/**
+ * Predicate pushdown to Accumulo filter iterators.
+ */
+package org.apache.hadoop.hive.accumulo.predicate;
\ No newline at end of file

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloCompositeRowId.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloCompositeRowId.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloCompositeRowId.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloCompositeRowId.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo.serde;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyObject;
+import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+
+/**
+ * AccumuloCompositeKey extension of LazyStruct. All complex composite keys should extend this class
+ * and override the {@link LazyStruct#getField(int)} method where fieldID corresponds to the ID of a
+ * key in the composite key.
+ * <p>
+ * For example, for a composite key <i>"/part1/part2/part3"</i>, <i>part1</i> will have an id
+ * <i>0</i>, <i>part2</i> will have an id <i>1</i> and <i>part3</i> will have an id <i>2</i>. Custom
+ * implementations of getField(fieldID) should return the value corresponding to that fieldID. So,
+ * for the above example, the value returned for <i>getField(0)</i> should be </i>part1</i>,
+ * <i>getField(1)</i> should be <i>part2</i> and <i>getField(2)</i> should be <i>part3</i>.
+ * </p>
+ *
+ * <p>
+ * All custom implementation are expected to have a constructor of the form:
+ *
+ * <pre>
+ * MyCustomCompositeKey(LazySimpleStructObjectInspector oi, Properties tbl, Configuration conf)
+ * </pre>
+ * </p>
+ *
+ */
+public class AccumuloCompositeRowId extends LazyStruct {
+
+  public AccumuloCompositeRowId(LazySimpleStructObjectInspector oi) {
+    super(oi);
+  }
+
+  @Override
+  public ArrayList<Object> getFieldsAsList() {
+    ArrayList<Object> allFields = new ArrayList<Object>();
+
+    List<? extends StructField> fields = oi.getAllStructFieldRefs();
+
+    for (int i = 0; i < fields.size(); i++) {
+      allFields.add(getField(i));
+    }
+
+    return allFields;
+  }
+
+  /**
+   * Create an initialize a {@link LazyObject} with the given bytes for the given fieldID.
+   *
+   * @param fieldID
+   *          field for which the object is to be created
+   * @param bytes
+   *          value with which the object is to be initialized with
+   * @return initialized {@link LazyObject}
+   * */
+  public LazyObject<? extends ObjectInspector> toLazyObject(int fieldID, byte[] bytes) {
+    ObjectInspector fieldOI = oi.getAllStructFieldRefs().get(fieldID).getFieldObjectInspector();
+
+    LazyObject<? extends ObjectInspector> lazyObject = LazyFactory.createLazyObject(fieldOI);
+
+    ByteArrayRef ref = new ByteArrayRef();
+
+    ref.setData(bytes);
+
+    // initialize the lazy object
+    lazyObject.init(ref, 0, ref.getData().length);
+
+    return lazyObject;
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowIdFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowIdFactory.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowIdFactory.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowIdFactory.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.serde;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+/**
+ * Interface for providing custom Accumulo RowID generation/parsing
+ */
+public interface AccumuloRowIdFactory {
+
+  /**
+   * initialize factory with properties
+   */
+  public void init(AccumuloSerDeParameters serDeParams, Properties properties)
+      throws SerDeException;
+
+  /**
+   * create custom object inspector for accumulo rowId
+   *
+   * @param type
+   *          type information
+   */
+  public ObjectInspector createRowIdObjectInspector(TypeInfo type) throws SerDeException;
+
+  /**
+   * create custom object for accumulo
+   *
+   * @param inspector
+   *          OI create by {@link AccumuloRowIdFactory#createRowIdObjectInspector}
+   */
+  public LazyObjectBase createRowId(ObjectInspector inspector) throws SerDeException;
+
+  /**
+   * serialize hive object in internal format of custom key
+   */
+  public byte[] serializeRowId(Object object, StructField field, ByteStream.Output output)
+      throws IOException;
+
+  /**
+   * Add this implementation to the classpath for the Job
+   */
+  public void addDependencyJars(Configuration conf) throws IOException;
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowSerializer.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowSerializer.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowSerializer.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowSerializer.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.serde;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding;
+import org.apache.hadoop.hive.accumulo.columns.ColumnMapping;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloMapColumnMapping;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+
+/**
+ * Serializes a Struct to an Accumulo row as per the definition provided by the
+ * {@link ColumnMapping}s
+ */
+public class AccumuloRowSerializer {
+  private static final Logger log = Logger.getLogger(AccumuloRowSerializer.class);
+
+  private final int rowIdOffset;
+  private final ByteStream.Output output;
+  private final SerDeParameters serDeParams;
+  private final List<ColumnMapping> mappings;
+  private final ColumnVisibility visibility;
+  private final AccumuloRowIdFactory rowIdFactory;
+
+  public AccumuloRowSerializer(int primaryKeyOffset, SerDeParameters serDeParams,
+      List<ColumnMapping> mappings, ColumnVisibility visibility, AccumuloRowIdFactory rowIdFactory) {
+    Preconditions.checkArgument(primaryKeyOffset >= 0,
+        "A valid offset to the mapping for the Accumulo RowID is required, received "
+            + primaryKeyOffset);
+    this.rowIdOffset = primaryKeyOffset;
+    this.output = new ByteStream.Output();
+    this.serDeParams = serDeParams;
+    this.mappings = mappings;
+    this.visibility = visibility;
+    this.rowIdFactory = rowIdFactory;
+  }
+
+  public Mutation serialize(Object obj, ObjectInspector objInspector) throws SerDeException,
+      IOException {
+    if (objInspector.getCategory() != ObjectInspector.Category.STRUCT) {
+      throw new SerDeException(getClass().toString()
+          + " can only serialize struct types, but we got: " + objInspector.getTypeName());
+    }
+
+    // Prepare the field ObjectInspectors
+    StructObjectInspector soi = (StructObjectInspector) objInspector;
+    List<? extends StructField> fields = soi.getAllStructFieldRefs();
+    List<Object> columnValues = soi.getStructFieldsDataAsList(obj);
+
+    // Fail if we try to access an offset out of bounds
+    if (rowIdOffset >= fields.size()) {
+      throw new IllegalStateException(
+          "Attempted to access field outside of definition for struct. Have " + fields.size()
+              + " fields and tried to access offset " + rowIdOffset);
+    }
+
+    StructField field = fields.get(rowIdOffset);
+    Object value = columnValues.get(rowIdOffset);
+
+    // The ObjectInspector for the row ID
+    ObjectInspector fieldObjectInspector = field.getFieldObjectInspector();
+
+    log.info("Serializing rowId with " + value + " in " + field + " using "
+        + rowIdFactory.getClass());
+
+    // Serialize the row component using the RowIdFactory. In the normal case, this will just
+    // delegate back to the "local" serializeRowId method
+    byte[] data = rowIdFactory.serializeRowId(value, field, output);
+
+    // Set that as the row id in the mutation
+    Mutation mutation = new Mutation(data);
+
+    // Each column in the row
+    for (int i = 0; i < fields.size(); i++) {
+      if (rowIdOffset == i) {
+        continue;
+      }
+
+      // Get the relevant information for this column
+      field = fields.get(i);
+      value = columnValues.get(i);
+
+      // Despite having a fixed schema from Hive, we have sparse columns in Accumulo
+      if (null == value) {
+        continue;
+      }
+
+      // The ObjectInspector for the current column
+      fieldObjectInspector = field.getFieldObjectInspector();
+
+      // Make sure we got the right implementation of a ColumnMapping
+      ColumnMapping mapping = mappings.get(i);
+      if (mapping instanceof HiveAccumuloColumnMapping) {
+        serializeColumnMapping((HiveAccumuloColumnMapping) mapping, fieldObjectInspector, value,
+            mutation);
+      } else if (mapping instanceof HiveAccumuloMapColumnMapping) {
+        serializeColumnMapping((HiveAccumuloMapColumnMapping) mapping, fieldObjectInspector, value,
+            mutation);
+      } else {
+        throw new IllegalArgumentException("Mapping for " + field.getFieldName()
+            + " was not a HiveColumnMapping, but was " + mapping.getClass());
+      }
+
+    }
+
+    return mutation;
+  }
+
+  protected void serializeColumnMapping(HiveAccumuloColumnMapping columnMapping,
+      ObjectInspector fieldObjectInspector, Object value, Mutation mutation) throws IOException {
+    // Get the serialized value for the column
+    byte[] serializedValue = getSerializedValue(fieldObjectInspector, value, output, columnMapping);
+
+    // Put it all in the Mutation
+    mutation.put(columnMapping.getColumnFamilyBytes(), columnMapping.getColumnQualifierBytes(),
+        visibility, serializedValue);
+  }
+
+  /**
+   * Serialize the Hive Map into an Accumulo row
+   */
+  protected void serializeColumnMapping(HiveAccumuloMapColumnMapping columnMapping,
+      ObjectInspector fieldObjectInspector, Object value, Mutation mutation) throws IOException {
+    MapObjectInspector mapObjectInspector = (MapObjectInspector) fieldObjectInspector;
+
+    Map<?,?> map = mapObjectInspector.getMap(value);
+    if (map == null) {
+      return;
+    }
+
+    ObjectInspector keyObjectInspector = mapObjectInspector.getMapKeyObjectInspector(), valueObjectInspector = mapObjectInspector
+        .getMapValueObjectInspector();
+
+    byte[] cfBytes = columnMapping.getColumnFamily().getBytes(Charsets.UTF_8), cqPrefixBytes = columnMapping
+        .getColumnQualifierPrefix().getBytes(Charsets.UTF_8);
+    byte[] cqBytes, valueBytes;
+    for (Entry<?,?> entry : map.entrySet()) {
+      output.reset();
+
+      // If the cq prefix is non-empty, add it to the CQ before we set the mutation
+      if (0 < cqPrefixBytes.length) {
+        output.write(cqPrefixBytes, 0, cqPrefixBytes.length);
+      }
+
+      // Write the "suffix" of the cq
+      writeWithLevel(keyObjectInspector, entry.getKey(), output, columnMapping, 3);
+      cqBytes = output.toByteArray();
+
+      output.reset();
+
+      // Write the value
+      writeWithLevel(valueObjectInspector, entry.getValue(), output, columnMapping, 3);
+      valueBytes = output.toByteArray();
+
+      mutation.put(cfBytes, cqBytes, visibility, valueBytes);
+    }
+  }
+
+  /**
+   * Serialize an Accumulo rowid
+   */
+  protected byte[] serializeRowId(Object rowId, StructField rowIdField, ColumnMapping rowIdMapping)
+      throws IOException {
+    if (rowId == null) {
+      throw new IOException("Accumulo rowId cannot be NULL");
+    }
+    // Reset the buffer we're going to use
+    output.reset();
+    ObjectInspector rowIdFieldOI = rowIdField.getFieldObjectInspector();
+    String rowIdMappingType = rowIdMapping.getColumnType();
+    TypeInfo rowIdTypeInfo = TypeInfoUtils.getTypeInfoFromTypeString(rowIdMappingType);
+
+    if (!rowIdFieldOI.getCategory().equals(ObjectInspector.Category.PRIMITIVE)
+        && rowIdTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE) {
+      // we always serialize the String type using the escaped algorithm for LazyString
+      writeString(output, SerDeUtils.getJSONString(rowId, rowIdFieldOI),
+          PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+      return output.toByteArray();
+    }
+
+    // use the serialization option switch to write primitive values as either a variable
+    // length UTF8 string or a fixed width bytes if serializing in binary format
+    getSerializedValue(rowIdFieldOI, rowId, output, rowIdMapping);
+    return output.toByteArray();
+  }
+
+  /**
+   * Compute the serialized value from the given element and object inspectors. Based on the Hive
+   * types, represented through the ObjectInspectors for the whole object and column within the
+   * object, serialize the object appropriately.
+   *
+   * @param fieldObjectInspector
+   *          ObjectInspector for the column value being serialized
+   * @param value
+   *          The Object itself being serialized
+   * @param output
+   *          A temporary buffer to reduce object creation
+   * @return The serialized bytes from the provided value.
+   * @throws IOException
+   *           An error occurred when performing IO to serialize the data
+   */
+  protected byte[] getSerializedValue(ObjectInspector fieldObjectInspector, Object value,
+      ByteStream.Output output, ColumnMapping mapping) throws IOException {
+    // Reset the buffer we're going to use
+    output.reset();
+
+    // Start by only serializing primitives as-is
+    if (fieldObjectInspector.getCategory() == ObjectInspector.Category.PRIMITIVE) {
+      writeSerializedPrimitive((PrimitiveObjectInspector) fieldObjectInspector, output, value,
+          mapping.getEncoding());
+    } else {
+      // We only accept a struct, which means that we're already nested one level deep
+      writeWithLevel(fieldObjectInspector, value, output, mapping, 2);
+    }
+
+    return output.toByteArray();
+  }
+
+  /**
+   * Recursively serialize an Object using its {@link ObjectInspector}, respecting the
+   * separators defined by the {@link SerDeParameters}.
+   * @param oi ObjectInspector for the current object
+   * @param value The current object
+   * @param output A buffer output is written to
+   * @param mapping The mapping for this Hive column
+   * @param level The current level/offset for the SerDe separator
+   * @throws IOException
+   */
+  protected void writeWithLevel(ObjectInspector oi, Object value, ByteStream.Output output,
+      ColumnMapping mapping, int level) throws IOException {
+    switch (oi.getCategory()) {
+      case PRIMITIVE:
+        if (mapping.getEncoding() == ColumnEncoding.BINARY) {
+          this.writeBinary(output, value, (PrimitiveObjectInspector) oi);
+        } else {
+          this.writeString(output, value, (PrimitiveObjectInspector) oi);
+        }
+        return;
+      case LIST:
+        char separator = (char) serDeParams.getSeparators()[level];
+        ListObjectInspector loi = (ListObjectInspector) oi;
+        List<?> list = loi.getList(value);
+        ObjectInspector eoi = loi.getListElementObjectInspector();
+        if (list == null) {
+          log.debug("No objects found when serializing list");
+          return;
+        } else {
+          for (int i = 0; i < list.size(); i++) {
+            if (i > 0) {
+              output.write(separator);
+            }
+            writeWithLevel(eoi, list.get(i), output, mapping, level + 1);
+          }
+        }
+        return;
+      case MAP:
+        char sep = (char) serDeParams.getSeparators()[level];
+        char keyValueSeparator = (char) serDeParams.getSeparators()[level + 1];
+        MapObjectInspector moi = (MapObjectInspector) oi;
+        ObjectInspector koi = moi.getMapKeyObjectInspector();
+        ObjectInspector voi = moi.getMapValueObjectInspector();
+
+        Map<?,?> map = moi.getMap(value);
+        if (map == null) {
+          log.debug("No object found when serializing map");
+          return;
+        } else {
+          boolean first = true;
+          for (Map.Entry<?,?> entry : map.entrySet()) {
+            if (first) {
+              first = false;
+            } else {
+              output.write(sep);
+            }
+            writeWithLevel(koi, entry.getKey(), output, mapping, level + 2);
+            output.write(keyValueSeparator);
+            writeWithLevel(voi, entry.getValue(), output, mapping, level + 2);
+          }
+        }
+        return;
+      case STRUCT:
+        sep = (char) serDeParams.getSeparators()[level];
+        StructObjectInspector soi = (StructObjectInspector) oi;
+        List<? extends StructField> fields = soi.getAllStructFieldRefs();
+        list = soi.getStructFieldsDataAsList(value);
+        if (list == null) {
+          log.debug("No object found when serializing struct");
+          return;
+        } else {
+          for (int i = 0; i < list.size(); i++) {
+            if (i > 0) {
+              output.write(sep);
+            }
+
+            writeWithLevel(fields.get(i).getFieldObjectInspector(), list.get(i), output, mapping,
+                level + 1);
+          }
+        }
+
+        return;
+      default:
+        throw new RuntimeException("Unknown category type: " + oi.getCategory());
+    }
+  }
+
+  /**
+   * Serialize the given primitive to the given output buffer, using the provided encoding
+   * mechanism.
+   *
+   * @param objectInspector
+   *          The PrimitiveObjectInspector for this Object
+   * @param output
+   *          A buffer to write the serialized value to
+   * @param value
+   *          The Object being serialized
+   * @param encoding
+   *          The means in which the Object should be serialized
+   * @throws IOException
+   */
+  protected void writeSerializedPrimitive(PrimitiveObjectInspector objectInspector,
+      ByteStream.Output output, Object value, ColumnEncoding encoding) throws IOException {
+    // Despite STRING being a primitive, it can't be serialized as binary
+    if (objectInspector.getPrimitiveCategory() != PrimitiveCategory.STRING && ColumnEncoding.BINARY == encoding) {
+      writeBinary(output, value, objectInspector);
+    } else {
+      writeString(output, value, objectInspector);
+    }
+  }
+
+  protected void writeBinary(ByteStream.Output output, Object value,
+      PrimitiveObjectInspector inspector) throws IOException {
+    LazyUtils.writePrimitive(output, value, inspector);
+  }
+
+  protected void writeString(ByteStream.Output output, Object value,
+      PrimitiveObjectInspector inspector) throws IOException {
+    LazyUtils.writePrimitiveUTF8(output, value, inspector, serDeParams.isEscaped(),
+        serDeParams.getEscapeChar(), serDeParams.getNeedsEscape());
+  }
+
+  protected ColumnVisibility getVisibility() {
+    return visibility;
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDe.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDe.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDe.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,140 @@
+package org.apache.hadoop.hive.accumulo.serde;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.accumulo.AccumuloHiveRow;
+import org.apache.hadoop.hive.accumulo.LazyAccumuloRow;
+import org.apache.hadoop.hive.accumulo.columns.ColumnMapping;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloRowIdColumnMapping;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Deserialization from Accumulo to LazyAccumuloRow for Hive.
+ *
+ */
+public class AccumuloSerDe implements SerDe {
+
+  private AccumuloSerDeParameters accumuloSerDeParameters;
+  private LazyAccumuloRow cachedRow;
+  private ObjectInspector cachedObjectInspector;
+  private AccumuloRowSerializer serializer;
+
+  private static final Logger log = LoggerFactory.getLogger(AccumuloSerDe.class);
+
+  public void initialize(Configuration conf, Properties properties) throws SerDeException {
+    accumuloSerDeParameters = new AccumuloSerDeParameters(conf, properties, getClass().getName());
+
+    final SerDeParameters serDeParams = accumuloSerDeParameters.getSerDeParameters();
+    final List<ColumnMapping> mappings = accumuloSerDeParameters.getColumnMappings();
+    final List<TypeInfo> columnTypes = accumuloSerDeParameters.getHiveColumnTypes();
+    final AccumuloRowIdFactory factory = accumuloSerDeParameters.getRowIdFactory();
+
+    ArrayList<ObjectInspector> columnObjectInspectors = getColumnObjectInspectors(columnTypes, serDeParams, mappings, factory);
+
+    cachedObjectInspector = LazyObjectInspectorFactory.getLazySimpleStructObjectInspector(
+        serDeParams.getColumnNames(), columnObjectInspectors, serDeParams.getSeparators()[0],
+        serDeParams.getNullSequence(), serDeParams.isLastColumnTakesRest(),
+        serDeParams.isEscaped(), serDeParams.getEscapeChar());
+
+    cachedRow = new LazyAccumuloRow((LazySimpleStructObjectInspector) cachedObjectInspector);
+
+    serializer = new AccumuloRowSerializer(accumuloSerDeParameters.getRowIdOffset(),
+        accumuloSerDeParameters.getSerDeParameters(), accumuloSerDeParameters.getColumnMappings(),
+        accumuloSerDeParameters.getTableVisibilityLabel(),
+        accumuloSerDeParameters.getRowIdFactory());
+
+    if (log.isInfoEnabled()) {
+      log.info("Initialized with {} type: {}", accumuloSerDeParameters.getSerDeParameters()
+          .getColumnNames(), accumuloSerDeParameters.getSerDeParameters().getColumnTypes());
+    }
+  }
+
+  protected ArrayList<ObjectInspector> getColumnObjectInspectors(List<TypeInfo> columnTypes,
+      SerDeParameters serDeParams, List<ColumnMapping> mappings, AccumuloRowIdFactory factory)
+      throws SerDeException {
+    ArrayList<ObjectInspector> columnObjectInspectors = new ArrayList<ObjectInspector>(
+        columnTypes.size());
+    for (int i = 0; i < columnTypes.size(); i++) {
+      TypeInfo type = columnTypes.get(i);
+      ColumnMapping mapping = mappings.get(i);
+      if (mapping instanceof HiveAccumuloRowIdColumnMapping) {
+        columnObjectInspectors.add(factory.createRowIdObjectInspector(type));
+      } else {
+        columnObjectInspectors.add(LazyFactory.createLazyObjectInspector(type,
+            serDeParams.getSeparators(), 1, serDeParams.getNullSequence(), serDeParams.isEscaped(),
+            serDeParams.getEscapeChar()));
+      }
+    }
+
+    return columnObjectInspectors;
+  }
+
+  /***
+   * For testing purposes.
+   */
+  public LazyAccumuloRow getCachedRow() {
+    return cachedRow;
+  }
+
+  public Class<? extends Writable> getSerializedClass() {
+    return Mutation.class;
+  }
+
+  @Override
+  public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
+    try {
+      return serializer.serialize(o, objectInspector);
+    } catch (IOException e) {
+      throw new SerDeException(e);
+    }
+  }
+
+  @Override
+  public Object deserialize(Writable writable) throws SerDeException {
+    if (!(writable instanceof AccumuloHiveRow)) {
+      throw new SerDeException(getClass().getName() + " : " + "Expected AccumuloHiveRow. Got "
+          + writable.getClass().getName());
+    }
+
+    cachedRow.init((AccumuloHiveRow) writable, accumuloSerDeParameters.getColumnMappings(),
+        accumuloSerDeParameters.getRowIdFactory());
+
+    return cachedRow;
+  }
+
+  public ObjectInspector getObjectInspector() throws SerDeException {
+    return cachedObjectInspector;
+  }
+
+  public SerDeStats getSerDeStats() {
+    throw new UnsupportedOperationException("SerdeStats not supported.");
+  }
+
+  public AccumuloSerDeParameters getParams() {
+    return accumuloSerDeParameters;
+  }
+
+  public boolean getIteratorPushdown() {
+    return accumuloSerDeParameters.getIteratorPushdown();
+  }
+
+  protected AccumuloRowSerializer getSerializer() {
+    return serializer;
+  }
+}