You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2014/08/20 00:41:13 UTC
svn commit: r1619005 [3/9] - in /hive/trunk: ./ accumulo-handler/
accumulo-handler/src/ accumulo-handler/src/java/
accumulo-handler/src/java/org/ accumulo-handler/src/java/org/apache/
accumulo-handler/src/java/org/apache/hadoop/ accumulo-handler/src/ja...
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.predicate;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.accumulo.core.data.Range;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloRowIdColumnMapping;
+import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp;
+import org.apache.hadoop.hive.accumulo.predicate.compare.Equal;
+import org.apache.hadoop.hive.accumulo.predicate.compare.GreaterThan;
+import org.apache.hadoop.hive.accumulo.predicate.compare.GreaterThanOrEqual;
+import org.apache.hadoop.hive.accumulo.predicate.compare.LessThan;
+import org.apache.hadoop.hive.accumulo.predicate.compare.LessThanOrEqual;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantFloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantLongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantShortObjectInspector;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class AccumuloRangeGenerator implements NodeProcessor {
+ private static final Logger log = LoggerFactory.getLogger(AccumuloRangeGenerator.class);
+
+ private final AccumuloPredicateHandler predicateHandler;
+ private final HiveAccumuloRowIdColumnMapping rowIdMapping;
+ private final String hiveRowIdColumnName;
+
+ public AccumuloRangeGenerator(AccumuloPredicateHandler predicateHandler,
+ HiveAccumuloRowIdColumnMapping rowIdMapping, String hiveRowIdColumnName) {
+ this.predicateHandler = predicateHandler;
+ this.rowIdMapping = rowIdMapping;
+ this.hiveRowIdColumnName = hiveRowIdColumnName;
+ }
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs)
+ throws SemanticException {
+ // If it's not some operator, pass it back
+ if (!(nd instanceof ExprNodeGenericFuncDesc)) {
+ return nd;
+ }
+
+ ExprNodeGenericFuncDesc func = (ExprNodeGenericFuncDesc) nd;
+
+ // 'and' nodes need to be intersected
+ if (FunctionRegistry.isOpAnd(func)) {
+ return processAndOpNode(nd, nodeOutputs);
+ // 'or' nodes need to be merged
+ } else if (FunctionRegistry.isOpOr(func)) {
+ return processOrOpNode(nd, nodeOutputs);
+ } else if (FunctionRegistry.isOpNot(func)) {
+ // TODO handle negations
+ throw new IllegalArgumentException("Negations not yet implemented");
+ } else {
+ return processExpression(func, nodeOutputs);
+ }
+ }
+
+ protected Object processAndOpNode(Node nd, Object[] nodeOutputs) {
+ // We might have multiple ranges coming from children
+ List<Range> andRanges = null;
+
+ for (Object nodeOutput : nodeOutputs) {
+ // null signifies nodes that are irrelevant to the generation
+ // of Accumulo Ranges
+ if (null == nodeOutput) {
+ continue;
+ }
+
+ // When an AND has no children (some conjunction over a field that isn't the column
+ // mapped to the Accumulo rowid) and when a conjunction generates Ranges which are empty
+ // (the children of the conjunction are disjoint), these two cases need to be kept separate.
+ //
+ // A null `andRanges` implies that ranges couldn't be computed, while an empty List
+ // of Ranges implies that there are no possible Ranges to lookup.
+ if (null == andRanges) {
+ andRanges = new ArrayList<Range>();
+ }
+
+ // The child is a single Range
+ if (nodeOutput instanceof Range) {
+ Range childRange = (Range) nodeOutput;
+
+ // No existing ranges, just accept the current
+ if (andRanges.isEmpty()) {
+ andRanges.add(childRange);
+ } else {
+ // For each range we have, intersect them. If they don't overlap
+ // the range can be discarded
+ List<Range> newRanges = new ArrayList<Range>();
+ for (Range andRange : andRanges) {
+ Range intersectedRange = andRange.clip(childRange, true);
+ if (null != intersectedRange) {
+ newRanges.add(intersectedRange);
+ }
+ }
+
+ // Set the newly-constructed ranges as the current state
+ andRanges = newRanges;
+ }
+ } else if (nodeOutput instanceof List) {
+ @SuppressWarnings("unchecked")
+ List<Range> childRanges = (List<Range>) nodeOutput;
+
+ // No ranges, use the ranges from the child
+ if (andRanges.isEmpty()) {
+ andRanges.addAll(childRanges);
+ } else {
+ List<Range> newRanges = new ArrayList<Range>();
+
+ // Cartesian product of our ranges, to the child ranges
+ for (Range andRange : andRanges) {
+ for (Range childRange : childRanges) {
+ Range intersectedRange = andRange.clip(childRange, true);
+
+ // Retain only valid intersections (discard disjoint ranges)
+ if (null != intersectedRange) {
+ newRanges.add(intersectedRange);
+ }
+ }
+ }
+
+ // Set the newly-constructed ranges as the current state
+ andRanges = newRanges;
+ }
+ } else {
+ log.error("Expected Range from {} but got {}", nd, nodeOutput);
+ throw new IllegalArgumentException("Expected Range but got "
+ + nodeOutput.getClass().getName());
+ }
+ }
+
+ return andRanges;
+ }
+
+ protected Object processOrOpNode(Node nd, Object[] nodeOutputs) {
+ List<Range> orRanges = new ArrayList<Range>(nodeOutputs.length);
+ for (Object nodeOutput : nodeOutputs) {
+ if (nodeOutput instanceof Range) {
+ orRanges.add((Range) nodeOutput);
+ } else if (nodeOutput instanceof List) {
+ @SuppressWarnings("unchecked")
+ List<Range> childRanges = (List<Range>) nodeOutput;
+ orRanges.addAll(childRanges);
+ } else {
+ log.error("Expected Range from " + nd + " but got " + nodeOutput);
+ throw new IllegalArgumentException("Expected Range but got "
+ + nodeOutput.getClass().getName());
+ }
+ }
+
+ // Try to merge multiple ranges together
+ if (orRanges.size() > 1) {
+ return Range.mergeOverlapping(orRanges);
+ } else if (1 == orRanges.size()) {
+ // Return just the single Range
+ return orRanges.get(0);
+ } else {
+ // No ranges, just return the empty list
+ return orRanges;
+ }
+ }
+
+ protected Object processExpression(ExprNodeGenericFuncDesc func, Object[] nodeOutputs)
+ throws SemanticException {
+ // a binary operator (gt, lt, ge, le, eq, ne)
+ GenericUDF genericUdf = func.getGenericUDF();
+
+ // Find the argument to the operator which is a constant
+ ExprNodeConstantDesc constantDesc = null;
+ ExprNodeColumnDesc columnDesc = null;
+ ExprNodeDesc leftHandNode = null;
+ for (Object nodeOutput : nodeOutputs) {
+ if (nodeOutput instanceof ExprNodeConstantDesc) {
+ // Ordering of constant and column in expression is important in correct range generation
+ if (null == leftHandNode) {
+ leftHandNode = (ExprNodeDesc) nodeOutput;
+ }
+
+ constantDesc = (ExprNodeConstantDesc) nodeOutput;
+ } else if (nodeOutput instanceof ExprNodeColumnDesc) {
+ // Ordering of constant and column in expression is important in correct range generation
+ if (null == leftHandNode) {
+ leftHandNode = (ExprNodeDesc) nodeOutput;
+ }
+
+ columnDesc = (ExprNodeColumnDesc) nodeOutput;
+ }
+ }
+
+ // If it's constant = constant or column = column, we can't fetch any ranges
+ // TODO We can try to be smarter and push up the value to some node which
+ // we can generate ranges from e.g. rowid > (4 + 5)
+ if (null == constantDesc || null == columnDesc) {
+ return null;
+ }
+
+ // Reject any clauses that are against a column that isn't the rowId mapping
+ if (!this.hiveRowIdColumnName.equals(columnDesc.getColumn())) {
+ return null;
+ }
+
+ ConstantObjectInspector objInspector = constantDesc.getWritableObjectInspector();
+
+ Text constText;
+ switch (rowIdMapping.getEncoding()) {
+ case STRING:
+ constText = getUtf8Value(objInspector);
+ break;
+ case BINARY:
+ try {
+ constText = getBinaryValue(objInspector);
+ } catch (IOException e) {
+ throw new SemanticException(e);
+ }
+ break;
+ default:
+ throw new SemanticException("Unable to parse unknown encoding: "
+ + rowIdMapping.getEncoding());
+ }
+
+ Class<? extends CompareOp> opClz;
+ try {
+ opClz = predicateHandler.getCompareOpClass(genericUdf.getUdfName());
+ } catch (NoSuchCompareOpException e) {
+ throw new IllegalArgumentException("Unhandled UDF class: " + genericUdf.getUdfName());
+ }
+
+ if (leftHandNode instanceof ExprNodeConstantDesc) {
+ return getConstantOpColumnRange(opClz, constText);
+ } else if (leftHandNode instanceof ExprNodeColumnDesc) {
+ return getColumnOpConstantRange(opClz, constText);
+ } else {
+ throw new IllegalStateException("Expected column or constant on LHS of expression");
+ }
+ }
+
+ protected Range getConstantOpColumnRange(Class<? extends CompareOp> opClz, Text constText) {
+ if (opClz.equals(Equal.class)) {
+ // 100 == x
+ return new Range(constText); // single row
+ } else if (opClz.equals(GreaterThanOrEqual.class)) {
+ // 100 >= x
+ return new Range(null, constText); // neg-infinity to end inclusive
+ } else if (opClz.equals(GreaterThan.class)) {
+ // 100 > x
+ return new Range(null, false, constText, false); // neg-infinity to end exclusive
+ } else if (opClz.equals(LessThanOrEqual.class)) {
+ // 100 <= x
+ return new Range(constText, true, null, false); // start inclusive to infinity
+ } else if (opClz.equals(LessThan.class)) {
+ // 100 < x
+ return new Range(constText, false, null, false); // start exclusive to infinity
+ } else {
+ throw new IllegalArgumentException("Could not process " + opClz);
+ }
+ }
+
+ protected Range getColumnOpConstantRange(Class<? extends CompareOp> opClz, Text constText) {
+ if (opClz.equals(Equal.class)) {
+ return new Range(constText); // start inclusive to end inclusive
+ } else if (opClz.equals(GreaterThanOrEqual.class)) {
+ return new Range(constText, null); // start inclusive to infinity inclusive
+ } else if (opClz.equals(GreaterThan.class)) {
+ return new Range(constText, false, null, false); // start exclusive to infinity inclusive
+ } else if (opClz.equals(LessThanOrEqual.class)) {
+ return new Range(null, false, constText, true); // neg-infinity to start inclusive
+ } else if (opClz.equals(LessThan.class)) {
+ return new Range(null, false, constText, false); // neg-infinity to start exclusive
+ } else {
+ throw new IllegalArgumentException("Could not process " + opClz);
+ }
+ }
+
+ protected Text getUtf8Value(ConstantObjectInspector objInspector) {
+ // TODO is there a more correct way to get the literal value for the Object?
+ return new Text(objInspector.getWritableConstantValue().toString());
+ }
+
+ /**
+ * Attempts to construct the binary value from the given inspector. Falls back to UTF8 encoding
+ * when the value cannot be coerced into binary.
+ *
+ * @return Binary value when possible, utf8 otherwise
+ * @throws IOException
+ */
+ protected Text getBinaryValue(ConstantObjectInspector objInspector) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ if (objInspector instanceof WritableConstantBooleanObjectInspector) {
+ LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+ (WritableConstantBooleanObjectInspector) objInspector);
+ } else if (objInspector instanceof WritableConstantByteObjectInspector) {
+ LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+ (WritableConstantByteObjectInspector) objInspector);
+ } else if (objInspector instanceof WritableConstantShortObjectInspector) {
+ LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+ (WritableConstantShortObjectInspector) objInspector);
+ } else if (objInspector instanceof WritableConstantIntObjectInspector) {
+ LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+ (WritableConstantIntObjectInspector) objInspector);
+ } else if (objInspector instanceof WritableConstantLongObjectInspector) {
+ LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+ (WritableConstantLongObjectInspector) objInspector);
+ } else if (objInspector instanceof WritableConstantDoubleObjectInspector) {
+ LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+ (WritableConstantDoubleObjectInspector) objInspector);
+ } else if (objInspector instanceof WritableConstantFloatObjectInspector) {
+ LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+ (WritableConstantDoubleObjectInspector) objInspector);
+ } else {
+ return getUtf8Value(objInspector);
+ }
+
+ out.close();
+ return new Text(out.toByteArray());
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchCompareOpException.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchCompareOpException.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchCompareOpException.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchCompareOpException.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.predicate;
+
+/**
+ *
+ */
+public class NoSuchCompareOpException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public NoSuchCompareOpException() {
+ super();
+ }
+
+ public NoSuchCompareOpException(String msg) {
+ super(msg);
+ }
+
+ public NoSuchCompareOpException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchPrimitiveComparisonException.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchPrimitiveComparisonException.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchPrimitiveComparisonException.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchPrimitiveComparisonException.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.predicate;
+
+import org.apache.hadoop.hive.accumulo.predicate.compare.PrimitiveComparison;
+
+/**
+ * Used when a {@link PrimitiveComparison} was specified but one with that name cannot be found
+ */
+public class NoSuchPrimitiveComparisonException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public NoSuchPrimitiveComparisonException() {
+ super();
+ }
+
+ public NoSuchPrimitiveComparisonException(String msg) {
+ super(msg);
+ }
+
+ public NoSuchPrimitiveComparisonException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,123 @@
+package org.apache.hadoop.hive.accumulo.predicate;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding;
+import org.apache.hadoop.hive.accumulo.columns.ColumnMappingFactory;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping;
+import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp;
+import org.apache.hadoop.hive.accumulo.predicate.compare.PrimitiveComparison;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Operates over a single qualifier.
+ *
+ * Delegates to PrimitiveCompare and CompareOpt instances for value acceptance.
+ *
+ * The PrimitiveCompare strategy assumes a consistent value type for the same column family and
+ * qualifier.
+ */
+public class PrimitiveComparisonFilter extends WholeRowIterator {
+ @SuppressWarnings("unused")
+ private static final Logger log = Logger.getLogger(PrimitiveComparisonFilter.class);
+
+ public static final String FILTER_PREFIX = "accumulo.filter.compare.iterator.";
+ public static final String P_COMPARE_CLASS = "accumulo.filter.iterator.p.compare.class";
+ public static final String COMPARE_OPT_CLASS = "accumulo.filter.iterator.compare.opt.class";
+ public static final String CONST_VAL = "accumulo.filter.iterator.const.val";
+ public static final String COLUMN = "accumulo.filter.iterator.qual";
+
+ private Text cfHolder, cqHolder, columnMappingFamily, columnMappingQualifier;
+ private HiveAccumuloColumnMapping columnMapping;
+ private CompareOp compOpt;
+
+ @Override
+ protected boolean filter(Text currentRow, List<Key> keys, List<Value> values) {
+ SortedMap<Key,Value> items;
+ boolean allow;
+ try { // if key doesn't contain CF, it's an encoded value from a previous iterator.
+ while (keys.get(0).getColumnFamily().getBytes().length == 0) {
+ items = decodeRow(keys.get(0), values.get(0));
+ keys = Lists.newArrayList(items.keySet());
+ values = Lists.newArrayList(items.values());
+ }
+ allow = accept(keys, values);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return allow;
+ }
+
+ private boolean accept(Collection<Key> keys, Collection<Value> values) {
+ Iterator<Key> kIter = keys.iterator();
+ Iterator<Value> vIter = values.iterator();
+ while (kIter.hasNext()) {
+ Key k = kIter.next();
+ Value v = vIter.next();
+ if (matchQualAndFam(k)) {
+ return compOpt.accept(v.get());
+ }
+ }
+ return false;
+ }
+
+ private boolean matchQualAndFam(Key k) {
+ k.getColumnFamily(cfHolder);
+ k.getColumnQualifier(cqHolder);
+ return cfHolder.equals(columnMappingFamily) && cqHolder.equals(columnMappingQualifier);
+ }
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+ IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ String serializedColumnMapping = options.get(COLUMN);
+ Entry<String,String> pair = ColumnMappingFactory.parseMapping(serializedColumnMapping);
+
+ // The ColumnEncoding, column name and type are all irrelevant at this point, just need the
+ // cf:[cq]
+ columnMapping = new HiveAccumuloColumnMapping(pair.getKey(), pair.getValue(),
+ ColumnEncoding.STRING, "column", "string");
+ columnMappingFamily = new Text(columnMapping.getColumnFamily());
+ columnMappingQualifier = new Text(columnMapping.getColumnQualifier());
+ cfHolder = new Text();
+ cqHolder = new Text();
+
+ try {
+ Class<?> pClass = Class.forName(options.get(P_COMPARE_CLASS));
+ Class<?> cClazz = Class.forName(options.get(COMPARE_OPT_CLASS));
+ PrimitiveComparison pCompare = pClass.asSubclass(PrimitiveComparison.class).newInstance();
+ compOpt = cClazz.asSubclass(CompareOp.class).newInstance();
+ byte[] constant = getConstant(options);
+ pCompare.init(constant);
+ compOpt.setPrimitiveCompare(pCompare);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ } catch (InstantiationException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(e);
+ }
+ }
+
+ protected byte[] getConstant(Map<String,String> options) {
+ String b64Const = options.get(CONST_VAL);
+ return Base64.decodeBase64(b64Const.getBytes());
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PushdownTuple.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PushdownTuple.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PushdownTuple.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PushdownTuple.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.predicate;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp;
+import org.apache.hadoop.hive.accumulo.predicate.compare.DoubleCompare;
+import org.apache.hadoop.hive.accumulo.predicate.compare.IntCompare;
+import org.apache.hadoop.hive.accumulo.predicate.compare.LongCompare;
+import org.apache.hadoop.hive.accumulo.predicate.compare.PrimitiveComparison;
+import org.apache.hadoop.hive.accumulo.predicate.compare.StringCompare;
+import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator;
+import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * For use in IteratorSetting construction.
+ *
+ * encapsulates a constant byte [], PrimitiveCompare instance, and CompareOp instance.
+ */
+public class PushdownTuple {
+ private static final Logger log = Logger.getLogger(PushdownTuple.class);
+
+ private byte[] constVal;
+ private PrimitiveComparison pCompare;
+ private CompareOp cOpt;
+
+ public PushdownTuple(IndexSearchCondition sc, PrimitiveComparison pCompare, CompareOp cOpt)
+ throws SerDeException {
+ ExprNodeConstantEvaluator eval = new ExprNodeConstantEvaluator(sc.getConstantDesc());
+
+ try {
+ this.pCompare = pCompare;
+ this.cOpt = cOpt;
+ Writable writable = (Writable) eval.evaluate(null);
+ constVal = getConstantAsBytes(writable);
+ } catch (ClassCastException cce) {
+ log.info(StringUtils.stringifyException(cce));
+ throw new SerDeException(" Column type mismatch in where clause "
+ + sc.getComparisonExpr().getExprString() + " found type "
+ + sc.getConstantDesc().getTypeString() + " instead of "
+ + sc.getColumnDesc().getTypeString());
+ } catch (HiveException e) {
+ throw new SerDeException(e);
+ }
+ }
+
+ public byte[] getConstVal() {
+ return constVal;
+ }
+
+ public PrimitiveComparison getpCompare() {
+ return pCompare;
+ }
+
+ public CompareOp getcOpt() {
+ return cOpt;
+ }
+
+ /**
+ *
+ * @return byte [] value from writable.
+ * @throws SerDeException
+ */
+ public byte[] getConstantAsBytes(Writable writable) throws SerDeException {
+ if (pCompare instanceof StringCompare) {
+ return writable.toString().getBytes();
+ } else if (pCompare instanceof DoubleCompare) {
+ byte[] bts = new byte[8];
+ double val = ((DoubleWritable) writable).get();
+ ByteBuffer.wrap(bts).putDouble(val);
+ return bts;
+ } else if (pCompare instanceof IntCompare) {
+ byte[] bts = new byte[4];
+ int val = ((IntWritable) writable).get();
+ ByteBuffer.wrap(bts).putInt(val);
+ return bts;
+ } else if (pCompare instanceof LongCompare) {
+ byte[] bts = new byte[8];
+ long val = ((LongWritable) writable).get();
+ ByteBuffer.wrap(bts).putLong(val);
+ return bts;
+ } else {
+ throw new SerDeException("Unsupported primitive category: " + pCompare.getClass().getName());
+ }
+ }
+
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/CompareOp.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/CompareOp.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/CompareOp.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/CompareOp.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,26 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Handles different types of comparisons in hive predicates. Filter iterator delegates value
+ * acceptance to the CompareOpt.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}. Works with
+ * {@link PrimitiveComparison}
+ */
+public interface CompareOp {
+ /**
+ * Sets the PrimitiveComparison for this CompareOp
+ */
+ public void setPrimitiveCompare(PrimitiveComparison comp);
+
+ /**
+ * @return The PrimitiveComparison this CompareOp is a part of
+ */
+ public PrimitiveComparison getPrimitiveCompare();
+
+ /**
+ * @param val The bytes from the Accumulo Value
+ * @return true if the value is accepted by this CompareOp
+ */
+ public boolean accept(byte[] val);
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/DoubleCompare.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/DoubleCompare.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/DoubleCompare.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/DoubleCompare.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,90 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+/**
+ * Set of comparison operations over a double constant. Used for Hive predicates involving double
+ * comparison.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class DoubleCompare implements PrimitiveComparison {
+
+ private BigDecimal constant;
+
+ /**
+ *
+ */
+ public void init(byte[] constant) {
+ this.constant = serialize(constant);
+ }
+
+ /**
+ * @return BigDecimal holding double byte [] value
+ */
+ public BigDecimal serialize(byte[] value) {
+ try {
+ return new BigDecimal(ByteBuffer.wrap(value).asDoubleBuffer().get());
+ } catch (Exception e) {
+ throw new RuntimeException(e.toString() + " occurred trying to build double value. "
+ + "Make sure the value type for the byte[] is double.");
+ }
+ }
+
+ /**
+ * @return true if double value is equal to constant, false otherwise.
+ */
+ @Override
+ public boolean isEqual(byte[] value) {
+ return serialize(value).compareTo(constant) == 0;
+ }
+
+ /**
+ * @return true if double value not equal to constant, false otherwise.
+ */
+ @Override
+ public boolean isNotEqual(byte[] value) {
+ return serialize(value).compareTo(constant) != 0;
+ }
+
+ /**
+ * @return true if value greater than or equal to constant, false otherwise.
+ */
+ @Override
+ public boolean greaterThanOrEqual(byte[] value) {
+ return serialize(value).compareTo(constant) >= 0;
+ }
+
+ /**
+ * @return true if value greater than constant, false otherwise.
+ */
+ @Override
+ public boolean greaterThan(byte[] value) {
+ return serialize(value).compareTo(constant) > 0;
+ }
+
+ /**
+ * @return true if value less than or equal than constant, false otherwise.
+ */
+ @Override
+ public boolean lessThanOrEqual(byte[] value) {
+ return serialize(value).compareTo(constant) <= 0;
+ }
+
+ /**
+ * @return true if value less than constant, false otherwise.
+ */
+ @Override
+ public boolean lessThan(byte[] value) {
+ return serialize(value).compareTo(constant) < 0;
+ }
+
+ /**
+ * not supported for this PrimitiveCompare implementation.
+ */
+ @Override
+ public boolean like(byte[] value) {
+ throw new UnsupportedOperationException("Like not supported for " + getClass().getName());
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Equal.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Equal.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Equal.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Equal.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to isEqual() over PrimitiveCompare instance.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class Equal implements CompareOp {
+
+ private PrimitiveComparison comp;
+
+ public Equal() {}
+
+ public Equal(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public void setPrimitiveCompare(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public PrimitiveComparison getPrimitiveCompare() {
+ return comp;
+ }
+
+ @Override
+ public boolean accept(byte[] val) {
+ return comp.isEqual(val);
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThan.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThan.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThan.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThan.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to greaterThan over {@link PrimitiveComparison} instance.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class GreaterThan implements CompareOp {
+
+ private PrimitiveComparison comp;
+
+ public GreaterThan() {}
+
+ public GreaterThan(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public void setPrimitiveCompare(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public PrimitiveComparison getPrimitiveCompare() {
+ return this.comp;
+ }
+
+ @Override
+ public boolean accept(byte[] val) {
+ return comp.greaterThan(val);
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThanOrEqual.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThanOrEqual.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThanOrEqual.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThanOrEqual.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to greaterThanOrEqual over {@link PrimitiveComparison} instance.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class GreaterThanOrEqual implements CompareOp {
+
+ private PrimitiveComparison comp;
+
+ public GreaterThanOrEqual() {}
+
+ public GreaterThanOrEqual(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public void setPrimitiveCompare(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public PrimitiveComparison getPrimitiveCompare() {
+ return comp;
+ }
+
+ @Override
+ public boolean accept(byte[] val) {
+ return comp.greaterThanOrEqual(val);
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/IntCompare.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/IntCompare.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/IntCompare.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/IntCompare.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,63 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Set of comparison operations over a integer constant. Used for Hive predicates involving int
+ * comparison.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class IntCompare implements PrimitiveComparison {
+
+ private int constant;
+
+ @Override
+ public void init(byte[] constant) {
+ this.constant = serialize(constant);
+ }
+
+ @Override
+ public boolean isEqual(byte[] value) {
+ return serialize(value) == constant;
+ }
+
+ @Override
+ public boolean isNotEqual(byte[] value) {
+ return serialize(value) != constant;
+ }
+
+ @Override
+ public boolean greaterThanOrEqual(byte[] value) {
+ return serialize(value) >= constant;
+ }
+
+ @Override
+ public boolean greaterThan(byte[] value) {
+ return serialize(value) > constant;
+ }
+
+ @Override
+ public boolean lessThanOrEqual(byte[] value) {
+ return serialize(value) <= constant;
+ }
+
+ @Override
+ public boolean lessThan(byte[] value) {
+ return serialize(value) < constant;
+ }
+
+ @Override
+ public boolean like(byte[] value) {
+ throw new UnsupportedOperationException("Like not supported for " + getClass().getName());
+ }
+
+ public Integer serialize(byte[] value) {
+ try {
+ return ByteBuffer.wrap(value).asIntBuffer().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e.toString() + " occurred trying to build int value. "
+ + "Make sure the value type for the byte[] is int ");
+ }
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThan.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThan.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThan.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThan.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to lessThan over {@link PrimitiveComparison} instance.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class LessThan implements CompareOp {
+
+ private PrimitiveComparison comp;
+
+ public LessThan() {}
+
+ public LessThan(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public void setPrimitiveCompare(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public PrimitiveComparison getPrimitiveCompare() {
+ return comp;
+ }
+
+ @Override
+ public boolean accept(byte[] val) {
+ return comp.lessThan(val);
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThanOrEqual.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThanOrEqual.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThanOrEqual.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThanOrEqual.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to lessThanOrEqual over {@link PrimitiveComparison} instance.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class LessThanOrEqual implements CompareOp {
+
+ private PrimitiveComparison comp;
+
+ public LessThanOrEqual() {}
+
+ public LessThanOrEqual(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public void setPrimitiveCompare(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public PrimitiveComparison getPrimitiveCompare() {
+ return comp;
+ }
+
+ @Override
+ public boolean accept(byte[] val) {
+ return comp.lessThanOrEqual(val);
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Like.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Like.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Like.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Like.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,33 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to like over {@link PrimitiveComparison} instance. Currently only supported by
+ * StringCompare.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class Like implements CompareOp {
+
+ PrimitiveComparison comp;
+
+ public Like() {}
+
+ public Like(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public void setPrimitiveCompare(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public PrimitiveComparison getPrimitiveCompare() {
+ return comp;
+ }
+
+ @Override
+ public boolean accept(byte[] val) {
+ return comp.like(val);
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LongCompare.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LongCompare.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LongCompare.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LongCompare.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,64 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Set of comparison operations over a long constant. Used for Hive predicates involving long
+ * comparison.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class LongCompare implements PrimitiveComparison {
+
+ private long constant;
+
+ @Override
+ public void init(byte[] constant) {
+ this.constant = serialize(constant);
+ }
+
+ @Override
+ public boolean isEqual(byte[] value) {
+ long lonVal = serialize(value);
+ return lonVal == constant;
+ }
+
+ @Override
+ public boolean isNotEqual(byte[] value) {
+ return serialize(value) != constant;
+ }
+
+ @Override
+ public boolean greaterThanOrEqual(byte[] value) {
+ return serialize(value) >= constant;
+ }
+
+ @Override
+ public boolean greaterThan(byte[] value) {
+ return serialize(value) > constant;
+ }
+
+ @Override
+ public boolean lessThanOrEqual(byte[] value) {
+ return serialize(value) <= constant;
+ }
+
+ @Override
+ public boolean lessThan(byte[] value) {
+ return serialize(value) < constant;
+ }
+
+ @Override
+ public boolean like(byte[] value) {
+ throw new UnsupportedOperationException("Like not supported for " + getClass().getName());
+ }
+
+ public Long serialize(byte[] value) {
+ try {
+ return ByteBuffer.wrap(value).asLongBuffer().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e.toString() + " occurred trying to build long value. "
+ + "Make sure the value type for the byte[] is long ");
+ }
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/NotEqual.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/NotEqual.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/NotEqual.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/NotEqual.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to isEqual over {@link PrimitiveComparison} instance and returns the negation.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class NotEqual implements CompareOp {
+
+ private PrimitiveComparison comp;
+
+ public NotEqual() {}
+
+ public NotEqual(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public void setPrimitiveCompare(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public PrimitiveComparison getPrimitiveCompare() {
+ return comp;
+ }
+
+ @Override
+ public boolean accept(byte[] val) {
+ return !comp.isEqual(val);
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/PrimitiveComparison.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/PrimitiveComparison.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/PrimitiveComparison.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/PrimitiveComparison.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps type-specific comparison operations over a constant value. Methods take raw byte from
+ * incoming Accumulo values.
+ *
+ * The CompareOpt instance in the iterator uses one or more methods from a PrimitiveCompare
+ * implementation to perform type-specific comparisons and determine acceptances.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}. Works with
+ * {@link CompareOp}
+ */
+public interface PrimitiveComparison {
+
+ public boolean isEqual(byte[] value);
+
+ public boolean isNotEqual(byte[] value);
+
+ public boolean greaterThanOrEqual(byte[] value);
+
+ public boolean greaterThan(byte[] value);
+
+ public boolean lessThanOrEqual(byte[] value);
+
+ public boolean lessThan(byte[] value);
+
+ public boolean like(byte[] value);
+
+ public Object serialize(byte[] value);
+
+ public void init(byte[] constant);
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/StringCompare.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/StringCompare.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/StringCompare.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/StringCompare.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,65 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Set of comparison operations over a string constant. Used for Hive predicates involving string
+ * comparison.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class StringCompare implements PrimitiveComparison {
+ @SuppressWarnings("unused")
+ private static final Logger log = Logger.getLogger(StringCompare.class);
+
+ private String constant;
+
+ @Override
+ public void init(byte[] constant) {
+ this.constant = serialize(constant);
+ }
+
+ @Override
+ public boolean isEqual(byte[] value) {
+ return serialize(value).equals(constant);
+ }
+
+ @Override
+ public boolean isNotEqual(byte[] value) {
+ return !isEqual(value);
+ }
+
+ @Override
+ public boolean greaterThanOrEqual(byte[] value) {
+ return serialize(value).compareTo(constant) >= 0;
+ }
+
+ @Override
+ public boolean greaterThan(byte[] value) {
+ return serialize(value).compareTo(constant) > 0;
+ }
+
+ @Override
+ public boolean lessThanOrEqual(byte[] value) {
+ return serialize(value).compareTo(constant) <= 0;
+ }
+
+ @Override
+ public boolean lessThan(byte[] value) {
+ return serialize(value).compareTo(constant) < 0;
+ }
+
+ @Override
+ public boolean like(byte[] value) {
+ String temp = new String(value).replaceAll("%", "[\\\\\\w]+?");
+ Pattern pattern = Pattern.compile(temp);
+ boolean match = pattern.matcher(constant).matches();
+ return match;
+ }
+
+ public String serialize(byte[] value) {
+ return new String(value);
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/package-info.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/package-info.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/package-info.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/package-info.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,4 @@
+/**
+ * PrimitiveCompare and CompareOpt implementations for use in PrimitiveComparisonFilter iterator
+ */
+package org.apache.hadoop.hive.accumulo.predicate.compare;
\ No newline at end of file
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/package-info.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/package-info.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/package-info.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/package-info.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,4 @@
+/**
+ * Predicate pushdown to Accumulo filter iterators.
+ */
+package org.apache.hadoop.hive.accumulo.predicate;
\ No newline at end of file
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloCompositeRowId.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloCompositeRowId.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloCompositeRowId.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloCompositeRowId.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo.serde;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyObject;
+import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+
+/**
+ * AccumuloCompositeKey extension of LazyStruct. All complex composite keys should extend this class
+ * and override the {@link LazyStruct#getField(int)} method where fieldID corresponds to the ID of a
+ * key in the composite key.
+ * <p>
+ * For example, for a composite key <i>"/part1/part2/part3"</i>, <i>part1</i> will have an id
+ * <i>0</i>, <i>part2</i> will have an id <i>1</i> and <i>part3</i> will have an id <i>2</i>. Custom
+ * implementations of getField(fieldID) should return the value corresponding to that fieldID. So,
+ * for the above example, the value returned for <i>getField(0)</i> should be </i>part1</i>,
+ * <i>getField(1)</i> should be <i>part2</i> and <i>getField(2)</i> should be <i>part3</i>.
+ * </p>
+ *
+ * <p>
+ * All custom implementation are expected to have a constructor of the form:
+ *
+ * <pre>
+ * MyCustomCompositeKey(LazySimpleStructObjectInspector oi, Properties tbl, Configuration conf)
+ * </pre>
+ * </p>
+ *
+ */
+public class AccumuloCompositeRowId extends LazyStruct {
+
+ public AccumuloCompositeRowId(LazySimpleStructObjectInspector oi) {
+ super(oi);
+ }
+
+ @Override
+ public ArrayList<Object> getFieldsAsList() {
+ ArrayList<Object> allFields = new ArrayList<Object>();
+
+ List<? extends StructField> fields = oi.getAllStructFieldRefs();
+
+ for (int i = 0; i < fields.size(); i++) {
+ allFields.add(getField(i));
+ }
+
+ return allFields;
+ }
+
+ /**
+ * Create an initialize a {@link LazyObject} with the given bytes for the given fieldID.
+ *
+ * @param fieldID
+ * field for which the object is to be created
+ * @param bytes
+ * value with which the object is to be initialized with
+ * @return initialized {@link LazyObject}
+ * */
+ public LazyObject<? extends ObjectInspector> toLazyObject(int fieldID, byte[] bytes) {
+ ObjectInspector fieldOI = oi.getAllStructFieldRefs().get(fieldID).getFieldObjectInspector();
+
+ LazyObject<? extends ObjectInspector> lazyObject = LazyFactory.createLazyObject(fieldOI);
+
+ ByteArrayRef ref = new ByteArrayRef();
+
+ ref.setData(bytes);
+
+ // initialize the lazy object
+ lazyObject.init(ref, 0, ref.getData().length);
+
+ return lazyObject;
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowIdFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowIdFactory.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowIdFactory.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowIdFactory.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.serde;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+/**
+ * Interface for providing custom Accumulo RowID generation/parsing
+ */
+public interface AccumuloRowIdFactory {
+
+ /**
+ * initialize factory with properties
+ */
+ public void init(AccumuloSerDeParameters serDeParams, Properties properties)
+ throws SerDeException;
+
+ /**
+ * create custom object inspector for accumulo rowId
+ *
+ * @param type
+ * type information
+ */
+ public ObjectInspector createRowIdObjectInspector(TypeInfo type) throws SerDeException;
+
+ /**
+ * create custom object for accumulo
+ *
+ * @param inspector
+ * OI create by {@link AccumuloRowIdFactory#createRowIdObjectInspector}
+ */
+ public LazyObjectBase createRowId(ObjectInspector inspector) throws SerDeException;
+
+ /**
+ * serialize hive object in internal format of custom key
+ */
+ public byte[] serializeRowId(Object object, StructField field, ByteStream.Output output)
+ throws IOException;
+
+ /**
+ * Add this implementation to the classpath for the Job
+ */
+ public void addDependencyJars(Configuration conf) throws IOException;
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowSerializer.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowSerializer.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowSerializer.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowSerializer.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.serde;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding;
+import org.apache.hadoop.hive.accumulo.columns.ColumnMapping;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloMapColumnMapping;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+
+/**
+ * Serializes a Struct to an Accumulo row as per the definition provided by the
+ * {@link ColumnMapping}s
+ */
+public class AccumuloRowSerializer {
+ private static final Logger log = Logger.getLogger(AccumuloRowSerializer.class);
+
+ private final int rowIdOffset;
+ private final ByteStream.Output output;
+ private final SerDeParameters serDeParams;
+ private final List<ColumnMapping> mappings;
+ private final ColumnVisibility visibility;
+ private final AccumuloRowIdFactory rowIdFactory;
+
+ public AccumuloRowSerializer(int primaryKeyOffset, SerDeParameters serDeParams,
+ List<ColumnMapping> mappings, ColumnVisibility visibility, AccumuloRowIdFactory rowIdFactory) {
+ Preconditions.checkArgument(primaryKeyOffset >= 0,
+ "A valid offset to the mapping for the Accumulo RowID is required, received "
+ + primaryKeyOffset);
+ this.rowIdOffset = primaryKeyOffset;
+ this.output = new ByteStream.Output();
+ this.serDeParams = serDeParams;
+ this.mappings = mappings;
+ this.visibility = visibility;
+ this.rowIdFactory = rowIdFactory;
+ }
+
+ public Mutation serialize(Object obj, ObjectInspector objInspector) throws SerDeException,
+ IOException {
+ if (objInspector.getCategory() != ObjectInspector.Category.STRUCT) {
+ throw new SerDeException(getClass().toString()
+ + " can only serialize struct types, but we got: " + objInspector.getTypeName());
+ }
+
+ // Prepare the field ObjectInspectors
+ StructObjectInspector soi = (StructObjectInspector) objInspector;
+ List<? extends StructField> fields = soi.getAllStructFieldRefs();
+ List<Object> columnValues = soi.getStructFieldsDataAsList(obj);
+
+ // Fail if we try to access an offset out of bounds
+ if (rowIdOffset >= fields.size()) {
+ throw new IllegalStateException(
+ "Attempted to access field outside of definition for struct. Have " + fields.size()
+ + " fields and tried to access offset " + rowIdOffset);
+ }
+
+ StructField field = fields.get(rowIdOffset);
+ Object value = columnValues.get(rowIdOffset);
+
+ // The ObjectInspector for the row ID
+ ObjectInspector fieldObjectInspector = field.getFieldObjectInspector();
+
+ log.info("Serializing rowId with " + value + " in " + field + " using "
+ + rowIdFactory.getClass());
+
+ // Serialize the row component using the RowIdFactory. In the normal case, this will just
+ // delegate back to the "local" serializeRowId method
+ byte[] data = rowIdFactory.serializeRowId(value, field, output);
+
+ // Set that as the row id in the mutation
+ Mutation mutation = new Mutation(data);
+
+ // Each column in the row
+ for (int i = 0; i < fields.size(); i++) {
+ if (rowIdOffset == i) {
+ continue;
+ }
+
+ // Get the relevant information for this column
+ field = fields.get(i);
+ value = columnValues.get(i);
+
+ // Despite having a fixed schema from Hive, we have sparse columns in Accumulo
+ if (null == value) {
+ continue;
+ }
+
+ // The ObjectInspector for the current column
+ fieldObjectInspector = field.getFieldObjectInspector();
+
+ // Make sure we got the right implementation of a ColumnMapping
+ ColumnMapping mapping = mappings.get(i);
+ if (mapping instanceof HiveAccumuloColumnMapping) {
+ serializeColumnMapping((HiveAccumuloColumnMapping) mapping, fieldObjectInspector, value,
+ mutation);
+ } else if (mapping instanceof HiveAccumuloMapColumnMapping) {
+ serializeColumnMapping((HiveAccumuloMapColumnMapping) mapping, fieldObjectInspector, value,
+ mutation);
+ } else {
+ throw new IllegalArgumentException("Mapping for " + field.getFieldName()
+ + " was not a HiveColumnMapping, but was " + mapping.getClass());
+ }
+
+ }
+
+ return mutation;
+ }
+
+ protected void serializeColumnMapping(HiveAccumuloColumnMapping columnMapping,
+ ObjectInspector fieldObjectInspector, Object value, Mutation mutation) throws IOException {
+ // Get the serialized value for the column
+ byte[] serializedValue = getSerializedValue(fieldObjectInspector, value, output, columnMapping);
+
+ // Put it all in the Mutation
+ mutation.put(columnMapping.getColumnFamilyBytes(), columnMapping.getColumnQualifierBytes(),
+ visibility, serializedValue);
+ }
+
+ /**
+ * Serialize the Hive Map into an Accumulo row
+ */
+ protected void serializeColumnMapping(HiveAccumuloMapColumnMapping columnMapping,
+ ObjectInspector fieldObjectInspector, Object value, Mutation mutation) throws IOException {
+ MapObjectInspector mapObjectInspector = (MapObjectInspector) fieldObjectInspector;
+
+ Map<?,?> map = mapObjectInspector.getMap(value);
+ if (map == null) {
+ return;
+ }
+
+ ObjectInspector keyObjectInspector = mapObjectInspector.getMapKeyObjectInspector(), valueObjectInspector = mapObjectInspector
+ .getMapValueObjectInspector();
+
+ byte[] cfBytes = columnMapping.getColumnFamily().getBytes(Charsets.UTF_8), cqPrefixBytes = columnMapping
+ .getColumnQualifierPrefix().getBytes(Charsets.UTF_8);
+ byte[] cqBytes, valueBytes;
+ for (Entry<?,?> entry : map.entrySet()) {
+ output.reset();
+
+ // If the cq prefix is non-empty, add it to the CQ before we set the mutation
+ if (0 < cqPrefixBytes.length) {
+ output.write(cqPrefixBytes, 0, cqPrefixBytes.length);
+ }
+
+ // Write the "suffix" of the cq
+ writeWithLevel(keyObjectInspector, entry.getKey(), output, columnMapping, 3);
+ cqBytes = output.toByteArray();
+
+ output.reset();
+
+ // Write the value
+ writeWithLevel(valueObjectInspector, entry.getValue(), output, columnMapping, 3);
+ valueBytes = output.toByteArray();
+
+ mutation.put(cfBytes, cqBytes, visibility, valueBytes);
+ }
+ }
+
+ /**
+ * Serialize an Accumulo rowid
+ */
+ protected byte[] serializeRowId(Object rowId, StructField rowIdField, ColumnMapping rowIdMapping)
+ throws IOException {
+ if (rowId == null) {
+ throw new IOException("Accumulo rowId cannot be NULL");
+ }
+ // Reset the buffer we're going to use
+ output.reset();
+ ObjectInspector rowIdFieldOI = rowIdField.getFieldObjectInspector();
+ String rowIdMappingType = rowIdMapping.getColumnType();
+ TypeInfo rowIdTypeInfo = TypeInfoUtils.getTypeInfoFromTypeString(rowIdMappingType);
+
+ if (!rowIdFieldOI.getCategory().equals(ObjectInspector.Category.PRIMITIVE)
+ && rowIdTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE) {
+ // we always serialize the String type using the escaped algorithm for LazyString
+ writeString(output, SerDeUtils.getJSONString(rowId, rowIdFieldOI),
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+ return output.toByteArray();
+ }
+
+ // use the serialization option switch to write primitive values as either a variable
+ // length UTF8 string or a fixed width bytes if serializing in binary format
+ getSerializedValue(rowIdFieldOI, rowId, output, rowIdMapping);
+ return output.toByteArray();
+ }
+
+ /**
+ * Compute the serialized value from the given element and object inspectors. Based on the Hive
+ * types, represented through the ObjectInspectors for the whole object and column within the
+ * object, serialize the object appropriately.
+ *
+ * @param fieldObjectInspector
+ * ObjectInspector for the column value being serialized
+ * @param value
+ * The Object itself being serialized
+ * @param output
+ * A temporary buffer to reduce object creation
+ * @return The serialized bytes from the provided value.
+ * @throws IOException
+ * An error occurred when performing IO to serialize the data
+ */
+ protected byte[] getSerializedValue(ObjectInspector fieldObjectInspector, Object value,
+ ByteStream.Output output, ColumnMapping mapping) throws IOException {
+ // Reset the buffer we're going to use
+ output.reset();
+
+ // Start by only serializing primitives as-is
+ if (fieldObjectInspector.getCategory() == ObjectInspector.Category.PRIMITIVE) {
+ writeSerializedPrimitive((PrimitiveObjectInspector) fieldObjectInspector, output, value,
+ mapping.getEncoding());
+ } else {
+ // We only accept a struct, which means that we're already nested one level deep
+ writeWithLevel(fieldObjectInspector, value, output, mapping, 2);
+ }
+
+ return output.toByteArray();
+ }
+
+ /**
+ * Recursively serialize an Object using its {@link ObjectInspector}, respecting the
+ * separators defined by the {@link SerDeParameters}.
+ * @param oi ObjectInspector for the current object
+ * @param value The current object
+ * @param output A buffer output is written to
+ * @param mapping The mapping for this Hive column
+ * @param level The current level/offset for the SerDe separator
+ * @throws IOException
+ */
+ protected void writeWithLevel(ObjectInspector oi, Object value, ByteStream.Output output,
+ ColumnMapping mapping, int level) throws IOException {
+ switch (oi.getCategory()) {
+ case PRIMITIVE:
+ if (mapping.getEncoding() == ColumnEncoding.BINARY) {
+ this.writeBinary(output, value, (PrimitiveObjectInspector) oi);
+ } else {
+ this.writeString(output, value, (PrimitiveObjectInspector) oi);
+ }
+ return;
+ case LIST:
+ char separator = (char) serDeParams.getSeparators()[level];
+ ListObjectInspector loi = (ListObjectInspector) oi;
+ List<?> list = loi.getList(value);
+ ObjectInspector eoi = loi.getListElementObjectInspector();
+ if (list == null) {
+ log.debug("No objects found when serializing list");
+ return;
+ } else {
+ for (int i = 0; i < list.size(); i++) {
+ if (i > 0) {
+ output.write(separator);
+ }
+ writeWithLevel(eoi, list.get(i), output, mapping, level + 1);
+ }
+ }
+ return;
+ case MAP:
+ char sep = (char) serDeParams.getSeparators()[level];
+ char keyValueSeparator = (char) serDeParams.getSeparators()[level + 1];
+ MapObjectInspector moi = (MapObjectInspector) oi;
+ ObjectInspector koi = moi.getMapKeyObjectInspector();
+ ObjectInspector voi = moi.getMapValueObjectInspector();
+
+ Map<?,?> map = moi.getMap(value);
+ if (map == null) {
+ log.debug("No object found when serializing map");
+ return;
+ } else {
+ boolean first = true;
+ for (Map.Entry<?,?> entry : map.entrySet()) {
+ if (first) {
+ first = false;
+ } else {
+ output.write(sep);
+ }
+ writeWithLevel(koi, entry.getKey(), output, mapping, level + 2);
+ output.write(keyValueSeparator);
+ writeWithLevel(voi, entry.getValue(), output, mapping, level + 2);
+ }
+ }
+ return;
+ case STRUCT:
+ sep = (char) serDeParams.getSeparators()[level];
+ StructObjectInspector soi = (StructObjectInspector) oi;
+ List<? extends StructField> fields = soi.getAllStructFieldRefs();
+ list = soi.getStructFieldsDataAsList(value);
+ if (list == null) {
+ log.debug("No object found when serializing struct");
+ return;
+ } else {
+ for (int i = 0; i < list.size(); i++) {
+ if (i > 0) {
+ output.write(sep);
+ }
+
+ writeWithLevel(fields.get(i).getFieldObjectInspector(), list.get(i), output, mapping,
+ level + 1);
+ }
+ }
+
+ return;
+ default:
+ throw new RuntimeException("Unknown category type: " + oi.getCategory());
+ }
+ }
+
+ /**
+ * Serialize the given primitive to the given output buffer, using the provided encoding
+ * mechanism.
+ *
+ * @param objectInspector
+ * The PrimitiveObjectInspector for this Object
+ * @param output
+ * A buffer to write the serialized value to
+ * @param value
+ * The Object being serialized
+ * @param encoding
+ * The means in which the Object should be serialized
+ * @throws IOException
+ */
+ protected void writeSerializedPrimitive(PrimitiveObjectInspector objectInspector,
+ ByteStream.Output output, Object value, ColumnEncoding encoding) throws IOException {
+ // Despite STRING being a primitive, it can't be serialized as binary
+ if (objectInspector.getPrimitiveCategory() != PrimitiveCategory.STRING && ColumnEncoding.BINARY == encoding) {
+ writeBinary(output, value, objectInspector);
+ } else {
+ writeString(output, value, objectInspector);
+ }
+ }
+
+ protected void writeBinary(ByteStream.Output output, Object value,
+ PrimitiveObjectInspector inspector) throws IOException {
+ LazyUtils.writePrimitive(output, value, inspector);
+ }
+
+ protected void writeString(ByteStream.Output output, Object value,
+ PrimitiveObjectInspector inspector) throws IOException {
+ LazyUtils.writePrimitiveUTF8(output, value, inspector, serDeParams.isEscaped(),
+ serDeParams.getEscapeChar(), serDeParams.getNeedsEscape());
+ }
+
+ protected ColumnVisibility getVisibility() {
+ return visibility;
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDe.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDe.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDe.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,140 @@
+package org.apache.hadoop.hive.accumulo.serde;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.accumulo.AccumuloHiveRow;
+import org.apache.hadoop.hive.accumulo.LazyAccumuloRow;
+import org.apache.hadoop.hive.accumulo.columns.ColumnMapping;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloRowIdColumnMapping;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Deserialization from Accumulo to LazyAccumuloRow for Hive.
+ *
+ */
+public class AccumuloSerDe implements SerDe {
+
+ private AccumuloSerDeParameters accumuloSerDeParameters;
+ private LazyAccumuloRow cachedRow;
+ private ObjectInspector cachedObjectInspector;
+ private AccumuloRowSerializer serializer;
+
+ private static final Logger log = LoggerFactory.getLogger(AccumuloSerDe.class);
+
+ public void initialize(Configuration conf, Properties properties) throws SerDeException {
+ accumuloSerDeParameters = new AccumuloSerDeParameters(conf, properties, getClass().getName());
+
+ final SerDeParameters serDeParams = accumuloSerDeParameters.getSerDeParameters();
+ final List<ColumnMapping> mappings = accumuloSerDeParameters.getColumnMappings();
+ final List<TypeInfo> columnTypes = accumuloSerDeParameters.getHiveColumnTypes();
+ final AccumuloRowIdFactory factory = accumuloSerDeParameters.getRowIdFactory();
+
+ ArrayList<ObjectInspector> columnObjectInspectors = getColumnObjectInspectors(columnTypes, serDeParams, mappings, factory);
+
+ cachedObjectInspector = LazyObjectInspectorFactory.getLazySimpleStructObjectInspector(
+ serDeParams.getColumnNames(), columnObjectInspectors, serDeParams.getSeparators()[0],
+ serDeParams.getNullSequence(), serDeParams.isLastColumnTakesRest(),
+ serDeParams.isEscaped(), serDeParams.getEscapeChar());
+
+ cachedRow = new LazyAccumuloRow((LazySimpleStructObjectInspector) cachedObjectInspector);
+
+ serializer = new AccumuloRowSerializer(accumuloSerDeParameters.getRowIdOffset(),
+ accumuloSerDeParameters.getSerDeParameters(), accumuloSerDeParameters.getColumnMappings(),
+ accumuloSerDeParameters.getTableVisibilityLabel(),
+ accumuloSerDeParameters.getRowIdFactory());
+
+ if (log.isInfoEnabled()) {
+ log.info("Initialized with {} type: {}", accumuloSerDeParameters.getSerDeParameters()
+ .getColumnNames(), accumuloSerDeParameters.getSerDeParameters().getColumnTypes());
+ }
+ }
+
+ protected ArrayList<ObjectInspector> getColumnObjectInspectors(List<TypeInfo> columnTypes,
+ SerDeParameters serDeParams, List<ColumnMapping> mappings, AccumuloRowIdFactory factory)
+ throws SerDeException {
+ ArrayList<ObjectInspector> columnObjectInspectors = new ArrayList<ObjectInspector>(
+ columnTypes.size());
+ for (int i = 0; i < columnTypes.size(); i++) {
+ TypeInfo type = columnTypes.get(i);
+ ColumnMapping mapping = mappings.get(i);
+ if (mapping instanceof HiveAccumuloRowIdColumnMapping) {
+ columnObjectInspectors.add(factory.createRowIdObjectInspector(type));
+ } else {
+ columnObjectInspectors.add(LazyFactory.createLazyObjectInspector(type,
+ serDeParams.getSeparators(), 1, serDeParams.getNullSequence(), serDeParams.isEscaped(),
+ serDeParams.getEscapeChar()));
+ }
+ }
+
+ return columnObjectInspectors;
+ }
+
+ /***
+ * For testing purposes.
+ */
+ public LazyAccumuloRow getCachedRow() {
+ return cachedRow;
+ }
+
+ public Class<? extends Writable> getSerializedClass() {
+ return Mutation.class;
+ }
+
+ @Override
+ public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
+ try {
+ return serializer.serialize(o, objectInspector);
+ } catch (IOException e) {
+ throw new SerDeException(e);
+ }
+ }
+
+ @Override
+ public Object deserialize(Writable writable) throws SerDeException {
+ if (!(writable instanceof AccumuloHiveRow)) {
+ throw new SerDeException(getClass().getName() + " : " + "Expected AccumuloHiveRow. Got "
+ + writable.getClass().getName());
+ }
+
+ cachedRow.init((AccumuloHiveRow) writable, accumuloSerDeParameters.getColumnMappings(),
+ accumuloSerDeParameters.getRowIdFactory());
+
+ return cachedRow;
+ }
+
+ public ObjectInspector getObjectInspector() throws SerDeException {
+ return cachedObjectInspector;
+ }
+
+ public SerDeStats getSerDeStats() {
+ throw new UnsupportedOperationException("SerdeStats not supported.");
+ }
+
+ public AccumuloSerDeParameters getParams() {
+ return accumuloSerDeParameters;
+ }
+
+ public boolean getIteratorPushdown() {
+ return accumuloSerDeParameters.getIteratorPushdown();
+ }
+
+ protected AccumuloRowSerializer getSerializer() {
+ return serializer;
+ }
+}