You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2014/08/20 00:41:13 UTC

svn commit: r1619005 [2/9] - in /hive/trunk: ./ accumulo-handler/ accumulo-handler/src/ accumulo-handler/src/java/ accumulo-handler/src/java/org/ accumulo-handler/src/java/org/apache/ accumulo-handler/src/java/org/apache/hadoop/ accumulo-handler/src/ja...

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMappingFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMappingFactory.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMappingFactory.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMappingFactory.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.columns;
+
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+/**
+ *
+ */
+public class ColumnMappingFactory {
+  private static final Logger log = Logger.getLogger(ColumnMappingFactory.class);
+
+  /**
+   * Generate the proper instance of a ColumnMapping
+   *
+   * @param columnSpec
+   *          Specification for mapping this column to Accumulo
+   * @param defaultEncoding
+   *          The default encoding in which values should be encoded to Accumulo
+   */
+  public static ColumnMapping get(String columnSpec, ColumnEncoding defaultEncoding,
+      String columnName, TypeInfo columnType) {
+    Preconditions.checkNotNull(columnSpec);
+    Preconditions.checkNotNull(columnName);
+    Preconditions.checkNotNull(columnType);
+    ColumnEncoding encoding = defaultEncoding;
+
+    // Check for column encoding specification
+    if (ColumnEncoding.hasColumnEncoding(columnSpec)) {
+      String columnEncodingStr = ColumnEncoding.getColumnEncoding(columnSpec);
+      columnSpec = ColumnEncoding.stripCode(columnSpec);
+
+      if (AccumuloHiveConstants.ROWID.equalsIgnoreCase(columnSpec)) {
+        return new HiveAccumuloRowIdColumnMapping(columnSpec,
+            ColumnEncoding.get(columnEncodingStr), columnName, columnType.getTypeName());
+      } else {
+        Entry<String,String> pair = parseMapping(columnSpec);
+
+        if (isPrefix(pair.getValue())) {
+          // Sanity check that, for a map, we got 2 encodings
+          if (!ColumnEncoding.isMapEncoding(columnEncodingStr)) {
+            throw new IllegalArgumentException("Expected map encoding for a map specification, "
+                + columnSpec + " with encoding " + columnEncodingStr);
+          }
+
+          Entry<ColumnEncoding,ColumnEncoding> encodings = ColumnEncoding
+              .getMapEncoding(columnEncodingStr);
+
+          return new HiveAccumuloMapColumnMapping(pair.getKey(), pair.getValue(),
+              encodings.getKey(), encodings.getValue(), columnName, columnType.getTypeName());
+        } else {
+          return new HiveAccumuloColumnMapping(pair.getKey(), pair.getValue(),
+              ColumnEncoding.getFromMapping(columnEncodingStr), columnName, columnType.getTypeName());
+        }
+      }
+    } else {
+      if (AccumuloHiveConstants.ROWID.equalsIgnoreCase(columnSpec)) {
+        return new HiveAccumuloRowIdColumnMapping(columnSpec, defaultEncoding, columnName,
+            columnType.getTypeName());
+      } else {
+        Entry<String,String> pair = parseMapping(columnSpec);
+        boolean isPrefix = isPrefix(pair.getValue());
+
+        String cq = pair.getValue();
+
+        // Replace any \* that appear in the prefix with a regular *
+        if (-1 != cq.indexOf(AccumuloHiveConstants.ESCAPED_ASTERISK)) {
+          cq = cq.replaceAll(AccumuloHiveConstants.ESCAPED_ASERTISK_REGEX,
+              Character.toString(AccumuloHiveConstants.ASTERISK));
+        }
+
+        if (isPrefix) {
+          return new HiveAccumuloMapColumnMapping(pair.getKey(), cq.substring(0, cq.length() - 1),
+              defaultEncoding, defaultEncoding, columnName, columnType.getTypeName());
+        } else {
+          return new HiveAccumuloColumnMapping(pair.getKey(), cq, encoding, columnName, columnType.getTypeName());
+        }
+      }
+    }
+  }
+
+  public static ColumnMapping getMap(String columnSpec, ColumnEncoding keyEncoding,
+      ColumnEncoding valueEncoding, String columnName, TypeInfo columnType) {
+    Entry<String,String> pair = parseMapping(columnSpec);
+    return new HiveAccumuloMapColumnMapping(pair.getKey(), pair.getValue(), keyEncoding,
+        valueEncoding, columnName, columnType.toString());
+
+  }
+
+  public static boolean isPrefix(String maybePrefix) {
+    Preconditions.checkNotNull(maybePrefix);
+
+    if (AccumuloHiveConstants.ASTERISK == maybePrefix.charAt(maybePrefix.length() - 1)) {
+      if (maybePrefix.length() > 1) {
+        return AccumuloHiveConstants.ESCAPE != maybePrefix.charAt(maybePrefix.length() - 2);
+      } else {
+        return true;
+      }
+    }
+
+    // If we couldn't find an asterisk, it's not a prefix
+    return false;
+  }
+
+  /**
+   * Consumes the column mapping specification and breaks it into column family and column
+   * qualifier.
+   */
+  public static Entry<String,String> parseMapping(String columnSpec)
+      throws InvalidColumnMappingException {
+    int index = 0;
+    while (true) {
+      if (index >= columnSpec.length()) {
+        log.error("Cannot parse '" + columnSpec + "' as colon-separated column configuration");
+        throw new InvalidColumnMappingException(
+            "Columns must be provided as colon-separated family and qualifier pairs");
+      }
+
+      index = columnSpec.indexOf(AccumuloHiveConstants.COLON, index);
+
+      if (-1 == index) {
+        log.error("Cannot parse '" + columnSpec + "' as colon-separated column configuration");
+        throw new InvalidColumnMappingException(
+            "Columns must be provided as colon-separated family and qualifier pairs");
+      }
+
+      // Check for an escape character before the colon
+      if (index - 1 > 0) {
+        char testChar = columnSpec.charAt(index - 1);
+        if (AccumuloHiveConstants.ESCAPE == testChar) {
+          // this colon is escaped, search again after it
+          index++;
+          continue;
+        }
+
+        // If the previous character isn't an escape characters, it's the separator
+      }
+
+      // Can't be escaped, it is the separator
+      break;
+    }
+
+    String cf = columnSpec.substring(0, index), cq = columnSpec.substring(index + 1);
+
+    // Check for the escaped colon to remove before doing the expensive regex replace
+    if (-1 != cf.indexOf(AccumuloHiveConstants.ESCAPED_COLON)) {
+      cf = cf.replaceAll(AccumuloHiveConstants.ESCAPED_COLON_REGEX,
+          Character.toString(AccumuloHiveConstants.COLON));
+    }
+
+    // Check for the escaped colon to remove before doing the expensive regex replace
+    if (-1 != cq.indexOf(AccumuloHiveConstants.ESCAPED_COLON)) {
+      cq = cq.replaceAll(AccumuloHiveConstants.ESCAPED_COLON_REGEX,
+          Character.toString(AccumuloHiveConstants.COLON));
+    }
+
+    return Maps.immutableEntry(cf, cq);
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloColumnMapping.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloColumnMapping.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloColumnMapping.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloColumnMapping.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.columns;
+
+import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Charsets;
+
+/**
+ * A Hive column which maps to a column family and column qualifier pair in Accumulo
+ */
+public class HiveAccumuloColumnMapping extends ColumnMapping {
+  @SuppressWarnings("unused")
+  private static final Logger log = Logger.getLogger(HiveAccumuloColumnMapping.class);
+
+  protected String columnFamily, columnQualifier;
+  protected byte[] columnFamilyBytes, columnQualifierBytes;
+
+  public HiveAccumuloColumnMapping(String cf, String cq, ColumnEncoding encoding,
+      String columnName, String columnType) {
+    super(cf + AccumuloHiveConstants.COLON + cq, encoding, columnName, columnType);
+
+    columnFamily = cf;
+    columnQualifier = cq;
+  }
+
+  public String getColumnFamily() {
+    return this.columnFamily;
+  }
+
+  /**
+   * Cached bytes for the columnFamily. Modifications to the bytes will affect those stored in this
+   * ColumnMapping -- such modifications are highly recommended against.
+   *
+   * @return UTF8 formatted bytes
+   */
+  public byte[] getColumnFamilyBytes() {
+    if (null == columnFamilyBytes) {
+      columnFamilyBytes = columnFamily.getBytes(Charsets.UTF_8);
+    }
+
+    return columnFamilyBytes;
+  }
+
+  public String getColumnQualifier() {
+    return this.columnQualifier;
+  }
+
+  /**
+   * Cached bytes for the columnQualifier. Modifications to the bytes will affect those stored in
+   * this ColumnMapping -- such modifications are highly recommended against.
+   *
+   * @return UTF8 formatted bytes
+   */
+  public byte[] getColumnQualifierBytes() {
+    if (null == columnQualifierBytes) {
+      columnQualifierBytes = columnQualifier.getBytes(Charsets.UTF_8);
+    }
+
+    return columnQualifierBytes;
+  }
+
+  public String serialize() {
+    StringBuilder sb = new StringBuilder(16);
+    sb.append(columnFamily).append(AccumuloHiveConstants.COLON);
+    if (null != columnQualifier) {
+      sb.append(columnQualifier);
+    }
+    return sb.toString();
+  }
+
+  @Override
+  public String toString() {
+    return "[" + this.getClass().getSimpleName() + ": " + columnFamily + ":" + columnQualifier
+        + ", encoding " + encoding + "]";
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloMapColumnMapping.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloMapColumnMapping.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloMapColumnMapping.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloMapColumnMapping.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.columns;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * ColumnMapping for combining Accumulo columns into a single Hive Map. Expects ColumnEncoding
+ * values for both the Key and Value of the Map.
+ */
+public class HiveAccumuloMapColumnMapping extends ColumnMapping {
+
+  protected final String columnFamily, columnQualifierPrefix;
+  protected final ColumnEncoding keyEncoding, valueEncoding;
+
+  /**
+   * @param columnFamily
+   *          The column family that all qualifiers within should be placed into the same Hive map
+   * @param columnQualifierPrefix
+   *          The column qualifier prefix to include in the map, null is treated as an empty prefix
+   * @param keyEncoding
+   *          The encoding scheme for keys in this column family
+   * @param valueEncoding
+   *          The encoding scheme for the Accumulo values
+   */
+  public HiveAccumuloMapColumnMapping(String columnFamily, String columnQualifierPrefix,
+      ColumnEncoding keyEncoding, ColumnEncoding valueEncoding, String columnName,
+      String columnType) {
+    // Try to make something reasonable to pass up to the base class
+    super((null == columnFamily ? "" : columnFamily) + AccumuloHiveConstants.COLON, valueEncoding,
+        columnName, columnType);
+
+    Preconditions.checkNotNull(columnFamily, "Must provide a column family");
+
+    this.columnFamily = columnFamily;
+    this.columnQualifierPrefix = (null == columnQualifierPrefix) ? "" : columnQualifierPrefix;
+    this.keyEncoding = keyEncoding;
+    this.valueEncoding = valueEncoding;
+  }
+
+  public String getColumnFamily() {
+    return columnFamily;
+  }
+
+  public String getColumnQualifierPrefix() {
+    return columnQualifierPrefix;
+  }
+
+  public ColumnEncoding getKeyEncoding() {
+    return keyEncoding;
+  }
+
+  public ColumnEncoding getValueEncoding() {
+    return valueEncoding;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof HiveAccumuloMapColumnMapping) {
+      HiveAccumuloMapColumnMapping other = (HiveAccumuloMapColumnMapping) o;
+      return columnFamily.equals(other.columnFamily)
+          && columnQualifierPrefix.equals(other.columnQualifierPrefix)
+          && keyEncoding.equals(other.keyEncoding) && valueEncoding.equals(other.valueEncoding);
+    }
+
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder(23, 31);
+    hcb.append(columnFamily).append(columnQualifierPrefix).append(keyEncoding)
+        .append(valueEncoding);
+    return hcb.toHashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "[" + this.getClass().getSimpleName() + ": " + columnFamily + ":"
+        + columnQualifierPrefix + "* encoding: " + keyEncoding + ":" + valueEncoding + "]";
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloRowIdColumnMapping.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloRowIdColumnMapping.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloRowIdColumnMapping.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloRowIdColumnMapping.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.columns;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * {@link ColumnMapping} which corresponds to the Hive column which should be used as the rowID in a
+ * {@link Mutation}
+ */
+public class HiveAccumuloRowIdColumnMapping extends ColumnMapping {
+
+  public HiveAccumuloRowIdColumnMapping(String columnSpec, ColumnEncoding encoding,
+      String columnName, String columnType) {
+    super(columnSpec, encoding, columnName, columnType);
+
+    // Ensure that we have the correct identifier as the column name
+    Preconditions.checkArgument(columnSpec.equalsIgnoreCase(AccumuloHiveConstants.ROWID));
+  }
+
+  @Override
+  public String toString() {
+    return "[" + this.getClass().getSimpleName() + ", " + this.mappingSpec + ", encoding "
+        + encoding + "]";
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveColumn.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveColumn.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveColumn.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveColumn.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.columns;
+
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *
+ */
+public class HiveColumn {
+
+  // The name of this column in the Hive schema
+  protected final String columnName;
+
+  // The Hive type of this column
+  protected final TypeInfo columnType;
+
+  public HiveColumn(String columnName, TypeInfo columnType) {
+    Preconditions.checkNotNull(columnName);
+    Preconditions.checkNotNull(columnType);
+
+    this.columnName = columnName;
+    this.columnType = columnType;
+  }
+
+  /**
+   * Get the name of the Hive column
+   */
+  public String getColumnName() {
+    return columnName;
+  }
+
+  /**
+   * The Hive type of this column
+   */
+  public TypeInfo getColumnType() {
+    return columnType;
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/InvalidColumnMappingException.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/InvalidColumnMappingException.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/InvalidColumnMappingException.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/InvalidColumnMappingException.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.columns;
+
+/**
+ *
+ */
+public class InvalidColumnMappingException extends IllegalArgumentException {
+
+  private static final long serialVersionUID = 1L;
+
+  public InvalidColumnMappingException() {
+    super();
+  }
+
+  public InvalidColumnMappingException(String msg) {
+    super(msg);
+  }
+
+  public InvalidColumnMappingException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public InvalidColumnMappingException(Throwable cause) {
+    super(cause);
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloRecordReader.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloRecordReader.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloRecordReader.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.mr;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.hadoop.hive.accumulo.AccumuloHiveRow;
+import org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDe;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordReader;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Translate the {@link Key} {@link Value} pairs from {@link AccumuloInputFormat} to a
+ * {@link Writable} for consumption by the {@link AccumuloSerDe}.
+ */
+public class HiveAccumuloRecordReader implements RecordReader<Text,AccumuloHiveRow> {
+  private RecordReader<Text,PeekingIterator<Entry<Key,Value>>> recordReader;
+  private int iteratorCount;
+
+  public HiveAccumuloRecordReader(
+      RecordReader<Text,PeekingIterator<Entry<Key,Value>>> recordReader, int iteratorCount) {
+    this.recordReader = recordReader;
+    this.iteratorCount = iteratorCount;
+  }
+
+  @Override
+  public void close() throws IOException {
+    recordReader.close();
+  }
+
+  @Override
+  public Text createKey() {
+    return new Text();
+  }
+
+  @Override
+  public AccumuloHiveRow createValue() {
+    return new AccumuloHiveRow();
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return 0;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return recordReader.getProgress();
+  }
+
+  @Override
+  public boolean next(Text rowKey, AccumuloHiveRow row) throws IOException {
+    Text key = recordReader.createKey();
+    PeekingIterator<Map.Entry<Key,Value>> iter = recordReader.createValue();
+    if (recordReader.next(key, iter)) {
+      row.clear();
+      row.setRowId(key.toString());
+      List<Key> keys = Lists.newArrayList();
+      List<Value> values = Lists.newArrayList();
+      while (iter.hasNext()) { // collect key/values for this row.
+        Map.Entry<Key,Value> kv = iter.next();
+        keys.add(kv.getKey());
+        values.add(kv.getValue());
+
+      }
+      if (iteratorCount == 0) { // no encoded values, we can push directly to row.
+        pushToValue(keys, values, row);
+      } else {
+        for (int i = 0; i < iteratorCount; i++) { // each iterator creates a level of encoding.
+          SortedMap<Key,Value> decoded = PrimitiveComparisonFilter.decodeRow(keys.get(0),
+              values.get(0));
+          keys = Lists.newArrayList(decoded.keySet());
+          values = Lists.newArrayList(decoded.values());
+        }
+        pushToValue(keys, values, row); // after decoding we can push to value.
+      }
+
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  // flatten key/value pairs into row object for use in Serde.
+  private void pushToValue(List<Key> keys, List<Value> values, AccumuloHiveRow row)
+      throws IOException {
+    Iterator<Key> kIter = keys.iterator();
+    Iterator<Value> vIter = values.iterator();
+    while (kIter.hasNext()) {
+      Key k = kIter.next();
+      Value v = vIter.next();
+      row.add(k.getColumnFamily().toString(), k.getColumnQualifier().toString(), v.get());
+    }
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloSplit.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloSplit.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloSplit.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloSplit.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo.mr;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.mapred.RangeInputSplit;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Wraps RangeInputSplit into a FileSplit so Hadoop won't complain when it tries to make its own
+ * Path.
+ *
+ * <p>
+ * If the {@link RangeInputSplit} is used directly, it will hit a branch of code in
+ * {@link HiveInputSplit} which generates an invalid Path. Wrap it ourselves so that it doesn't
+ * error
+ */
+public class HiveAccumuloSplit extends FileSplit implements InputSplit {
+  private static final Logger log = Logger.getLogger(HiveAccumuloSplit.class);
+
+  private RangeInputSplit split;
+
+  public HiveAccumuloSplit() {
+    super((Path) null, 0, 0, (String[]) null);
+    split = new RangeInputSplit();
+  }
+
+  public HiveAccumuloSplit(RangeInputSplit split, Path dummyPath) {
+    super(dummyPath, 0, 0, (String[]) null);
+    this.split = split;
+  }
+
+  public RangeInputSplit getSplit() {
+    return this.split;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    split.readFields(in);
+  }
+
+  @Override
+  public String toString() {
+    return "HiveAccumuloSplit: " + split;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    split.write(out);
+  }
+
+  @Override
+  public long getLength() {
+    int len = 0;
+    try {
+      return split.getLength();
+    } catch (IOException e) {
+      log.error("Error getting length for split: " + StringUtils.stringifyException(e));
+    }
+    return len;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException {
+    return split.getLocations();
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,485 @@
+package org.apache.hadoop.hive.accumulo.mr;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapred.AccumuloRowInputFormat;
+import org.apache.accumulo.core.client.mapred.RangeInputSplit;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.accumulo.AccumuloConnectionParameters;
+import org.apache.hadoop.hive.accumulo.AccumuloHiveRow;
+import org.apache.hadoop.hive.accumulo.columns.ColumnMapper;
+import org.apache.hadoop.hive.accumulo.columns.ColumnMapping;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloMapColumnMapping;
+import org.apache.hadoop.hive.accumulo.predicate.AccumuloPredicateHandler;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
+import org.apache.hadoop.hive.accumulo.serde.TooManyAccumuloColumnsException;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wraps older InputFormat for use with Hive.
+ *
+ * Configure input scan with proper ranges, iterators, and columns based on serde properties for
+ * Hive table.
+ */
+public class HiveAccumuloTableInputFormat implements
+    org.apache.hadoop.mapred.InputFormat<Text,AccumuloHiveRow> {
+  private static final Logger log = LoggerFactory.getLogger(HiveAccumuloTableInputFormat.class);
+
+  // Visible for testing
+  protected AccumuloRowInputFormat accumuloInputFormat = new AccumuloRowInputFormat();
+  protected AccumuloPredicateHandler predicateHandler = AccumuloPredicateHandler.getInstance();
+
+  @Override
+  public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
+    final AccumuloConnectionParameters accumuloParams = new AccumuloConnectionParameters(jobConf);
+    final Instance instance = accumuloParams.getInstance();
+    final ColumnMapper columnMapper;
+    try {
+      columnMapper = getColumnMapper(jobConf);
+    } catch (TooManyAccumuloColumnsException e) {
+      throw new IOException(e);
+    }
+
+    JobContext context = ShimLoader.getHadoopShims().newJobContext(Job.getInstance(jobConf));
+    Path[] tablePaths = FileInputFormat.getInputPaths(context);
+
+    try {
+      final Connector connector = accumuloParams.getConnector(instance);
+      final List<ColumnMapping> columnMappings = columnMapper.getColumnMappings();
+      final List<IteratorSetting> iterators = predicateHandler.getIterators(jobConf, columnMapper);
+      final Collection<Range> ranges = predicateHandler.getRanges(jobConf, columnMapper);
+
+      // Setting an empty collection of ranges will, unexpectedly, scan all data
+      // We don't want that.
+      if (null != ranges && ranges.isEmpty()) {
+        return new InputSplit[0];
+      }
+
+      // Set the relevant information in the Configuration for the AccumuloInputFormat
+      configure(jobConf, instance, connector, accumuloParams, columnMapper, iterators, ranges);
+
+      int numColumns = columnMappings.size();
+
+      List<Integer> readColIds = ColumnProjectionUtils.getReadColumnIDs(jobConf);
+
+      // Sanity check
+      if (numColumns < readColIds.size())
+        throw new IOException("Number of column mappings (" + numColumns + ")"
+            + " numbers less than the hive table columns. (" + readColIds.size() + ")");
+
+      // get splits from Accumulo
+      InputSplit[] splits = accumuloInputFormat.getSplits(jobConf, numSplits);
+
+      HiveAccumuloSplit[] hiveSplits = new HiveAccumuloSplit[splits.length];
+      for (int i = 0; i < splits.length; i++) {
+        RangeInputSplit ris = (RangeInputSplit) splits[i];
+        hiveSplits[i] = new HiveAccumuloSplit(ris, tablePaths[0]);
+      }
+
+      return hiveSplits;
+    } catch (AccumuloException e) {
+      log.error("Could not configure AccumuloInputFormat", e);
+      throw new IOException(StringUtils.stringifyException(e));
+    } catch (AccumuloSecurityException e) {
+      log.error("Could not configure AccumuloInputFormat", e);
+      throw new IOException(StringUtils.stringifyException(e));
+    } catch (SerDeException e) {
+      log.error("Could not configure AccumuloInputFormat", e);
+      throw new IOException(StringUtils.stringifyException(e));
+    }
+  }
+
+  /**
+   * Setup accumulo input format from conf properties. Delegates to final RecordReader from mapred
+   * package.
+   *
+   * @param inputSplit
+   * @param jobConf
+   * @param reporter
+   * @return RecordReader
+   * @throws IOException
+   */
+  @Override
+  public RecordReader<Text,AccumuloHiveRow> getRecordReader(InputSplit inputSplit,
+      final JobConf jobConf, final Reporter reporter) throws IOException {
+    final ColumnMapper columnMapper;
+    try {
+      columnMapper = getColumnMapper(jobConf);
+    } catch (TooManyAccumuloColumnsException e) {
+      throw new IOException(e);
+    }
+
+    try {
+      final List<IteratorSetting> iterators = predicateHandler.getIterators(jobConf, columnMapper);
+
+      HiveAccumuloSplit hiveSplit = (HiveAccumuloSplit) inputSplit;
+      RangeInputSplit rangeSplit = hiveSplit.getSplit();
+
+      log.info("Split: " + rangeSplit);
+
+      // The RangeInputSplit *should* have all of the necesary information contained in it
+      // which alleviates us from re-parsing our configuration from the AccumuloStorageHandler
+      // and re-setting it into the Configuration (like we did in getSplits(...)). Thus, it should
+      // be unnecessary to re-invoke configure(...)
+
+      // ACCUMULO-2962 Iterators weren't getting serialized into the InputSplit, but we can
+      // compensate because we still have that info.
+      // Should be fixed in Accumulo 1.5.2 and 1.6.1
+      if (null == rangeSplit.getIterators()
+          || (rangeSplit.getIterators().isEmpty() && !iterators.isEmpty())) {
+        log.debug("Re-setting iterators on InputSplit due to Accumulo bug.");
+        rangeSplit.setIterators(iterators);
+      }
+
+      // ACCUMULO-3015 Like the above, RangeInputSplit should have the table name
+      // but we want it to, so just re-set it if it's null.
+      if (null == getTableName(rangeSplit)) {
+        final AccumuloConnectionParameters accumuloParams = new AccumuloConnectionParameters(
+            jobConf);
+        log.debug("Re-setting table name on InputSplit due to Accumulo bug.");
+        setTableName(rangeSplit, accumuloParams.getAccumuloTableName());
+      }
+
+      final RecordReader<Text,PeekingIterator<Map.Entry<Key,Value>>> recordReader = accumuloInputFormat
+          .getRecordReader(rangeSplit, jobConf, reporter);
+
+      return new HiveAccumuloRecordReader(recordReader, iterators.size());
+    } catch (SerDeException e) {
+      throw new IOException(StringUtils.stringifyException(e));
+    }
+  }
+
+  protected ColumnMapper getColumnMapper(Configuration conf) throws IOException,
+      TooManyAccumuloColumnsException {
+    final String defaultStorageType = conf.get(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE);
+
+    String[] columnNamesArr = conf.getStrings(serdeConstants.LIST_COLUMNS);
+    if (null == columnNamesArr) {
+      throw new IOException(
+          "Hive column names must be provided to InputFormat in the Configuration");
+    }
+    List<String> columnNames = Arrays.asList(columnNamesArr);
+
+    String serializedTypes = conf.get(serdeConstants.LIST_COLUMN_TYPES);
+    if (null == serializedTypes) {
+      throw new IOException(
+          "Hive column types must be provided to InputFormat in the Configuration");
+    }
+    ArrayList<TypeInfo> columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(serializedTypes);
+
+    return new ColumnMapper(conf.get(AccumuloSerDeParameters.COLUMN_MAPPINGS), defaultStorageType,
+        columnNames, columnTypes);
+  }
+
+  /**
+   * Configure the underlying AccumuloInputFormat
+   *
+   * @param conf
+   *          Job configuration
+   * @param instance
+   *          Accumulo instance
+   * @param connector
+   *          Accumulo connector
+   * @param accumuloParams
+   *          Connection information to the Accumulo instance
+   * @param columnMapper
+   *          Configuration of Hive to Accumulo columns
+   * @param iterators
+   *          Any iterators to be configured server-side
+   * @param ranges
+   *          Accumulo ranges on for the query
+   * @throws AccumuloSecurityException
+   * @throws AccumuloException
+   * @throws SerDeException
+   */
+  protected void configure(JobConf conf, Instance instance, Connector connector,
+      AccumuloConnectionParameters accumuloParams, ColumnMapper columnMapper,
+      List<IteratorSetting> iterators, Collection<Range> ranges) throws AccumuloSecurityException,
+      AccumuloException, SerDeException {
+
+    // Handle implementation of Instance and invoke appropriate InputFormat method
+    if (instance instanceof MockInstance) {
+      setMockInstance(conf, instance.getInstanceName());
+    } else {
+      setZooKeeperInstance(conf, instance.getInstanceName(), instance.getZooKeepers());
+    }
+
+    // Set the username/passwd for the Accumulo connection
+    setConnectorInfo(conf, accumuloParams.getAccumuloUserName(),
+        new PasswordToken(accumuloParams.getAccumuloPassword()));
+
+    // Read from the given Accumulo table
+    setInputTableName(conf, accumuloParams.getAccumuloTableName());
+
+    // Check Configuration for any user-provided Authorization definition
+    Authorizations auths = AccumuloSerDeParameters.getAuthorizationsFromConf(conf);
+
+    if (null == auths) {
+      // Default to all of user's authorizations when no configuration is provided
+      auths = connector.securityOperations().getUserAuthorizations(
+          accumuloParams.getAccumuloUserName());
+    }
+
+    // Implicitly handles users providing invalid authorizations
+    setScanAuthorizations(conf, auths);
+
+    // restrict with any filters found from WHERE predicates.
+    addIterators(conf, iterators);
+
+    // restrict with any ranges found from WHERE predicates.
+    // not setting ranges scans the entire table
+    if (null != ranges) {
+      log.info("Setting ranges: " + ranges);
+      setRanges(conf, ranges);
+    }
+
+    // Restrict the set of columns that we want to read from the Accumulo table
+    HashSet<Pair<Text,Text>> pairs = getPairCollection(columnMapper.getColumnMappings());
+    if (null != pairs && !pairs.isEmpty()) {
+      fetchColumns(conf, pairs);
+    }
+  }
+
+  // Wrap the static AccumuloInputFormat methods with methods that we can
+  // verify were correctly called via Mockito
+
+  protected void setMockInstance(JobConf conf, String instanceName) {
+    try {
+      AccumuloInputFormat.setMockInstance(conf, instanceName);
+    } catch (IllegalStateException e) {
+      // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
+      log.debug("Ignoring exception setting mock instance of " + instanceName, e);
+    }
+  }
+
+  @SuppressWarnings("deprecation")
+  protected void setZooKeeperInstance(JobConf conf, String instanceName, String zkHosts) {
+    // To support builds against 1.5, we can't use the new 1.6 setZooKeeperInstance which
+    // takes a ClientConfiguration class that only exists in 1.6
+    try {
+      AccumuloInputFormat.setZooKeeperInstance(conf, instanceName, zkHosts);
+    } catch (IllegalStateException ise) {
+      // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
+      log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at "
+          + zkHosts, ise);
+    }
+  }
+
+  protected void setConnectorInfo(JobConf conf, String user, AuthenticationToken token)
+      throws AccumuloSecurityException {
+    try {
+      AccumuloInputFormat.setConnectorInfo(conf, user, token);
+    } catch (IllegalStateException e) {
+      // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
+      log.debug("Ignoring exception setting Accumulo Connector instance for user " + user, e);
+    }
+  }
+
+  protected void setInputTableName(JobConf conf, String tableName) {
+    AccumuloInputFormat.setInputTableName(conf, tableName);
+  }
+
+  protected void setScanAuthorizations(JobConf conf, Authorizations auths) {
+    AccumuloInputFormat.setScanAuthorizations(conf, auths);
+  }
+
+  protected void addIterators(JobConf conf, List<IteratorSetting> iterators) {
+    for (IteratorSetting is : iterators) {
+      AccumuloInputFormat.addIterator(conf, is);
+    }
+  }
+
+  protected void setRanges(JobConf conf, Collection<Range> ranges) {
+    AccumuloInputFormat.setRanges(conf, ranges);
+  }
+
+  protected void fetchColumns(JobConf conf, Set<Pair<Text,Text>> cfCqPairs) {
+    AccumuloInputFormat.fetchColumns(conf, cfCqPairs);
+  }
+
+  /**
+   * Create col fam/qual pairs from pipe separated values, usually from config object. Ignores
+   * rowID.
+   *
+   * @param columnMappings
+   *          The list of ColumnMappings for the given query
+   * @return a Set of Pairs of colfams and colquals
+   */
+  protected HashSet<Pair<Text,Text>> getPairCollection(List<ColumnMapping> columnMappings) {
+    final HashSet<Pair<Text,Text>> pairs = new HashSet<Pair<Text,Text>>();
+
+    for (ColumnMapping columnMapping : columnMappings) {
+      if (columnMapping instanceof HiveAccumuloColumnMapping) {
+        HiveAccumuloColumnMapping accumuloColumnMapping = (HiveAccumuloColumnMapping) columnMapping;
+
+        Text cf = new Text(accumuloColumnMapping.getColumnFamily());
+        Text cq = null;
+
+        // A null cq implies an empty column qualifier
+        if (null != accumuloColumnMapping.getColumnQualifier()) {
+          cq = new Text(accumuloColumnMapping.getColumnQualifier());
+        }
+
+        pairs.add(new Pair<Text,Text>(cf, cq));
+      } else if (columnMapping instanceof HiveAccumuloMapColumnMapping) {
+        HiveAccumuloMapColumnMapping mapMapping = (HiveAccumuloMapColumnMapping) columnMapping;
+
+        // Can't fetch prefix on colqual, must pull the entire qualifier
+        // TODO use an iterator to do the filter, server-side.
+        pairs.add(new Pair<Text,Text>(new Text(mapMapping.getColumnFamily()), null));
+      }
+    }
+
+    log.info("Computed columns to fetch (" + pairs + ") from " + columnMappings);
+
+    return pairs;
+  }
+
+  /**
+   * Reflection to work around Accumulo 1.5 and 1.6 incompatibilities. Throws an {@link IOException}
+   * for any reflection related exceptions
+   *
+   * @param split
+   *          A RangeInputSplit
+   * @return The name of the table from the split
+   * @throws IOException
+   */
+  protected String getTableName(RangeInputSplit split) throws IOException {
+    // ACCUMULO-3017 shenanigans with method names changing without deprecation
+    Method getTableName = null;
+    try {
+      getTableName = RangeInputSplit.class.getMethod("getTableName");
+    } catch (SecurityException e) {
+      log.debug("Could not get getTableName method from RangeInputSplit", e);
+    } catch (NoSuchMethodException e) {
+      log.debug("Could not get getTableName method from RangeInputSplit", e);
+    }
+
+    if (null != getTableName) {
+      try {
+        return (String) getTableName.invoke(split);
+      } catch (IllegalArgumentException e) {
+        log.debug("Could not invoke getTableName method from RangeInputSplit", e);
+      } catch (IllegalAccessException e) {
+        log.debug("Could not invoke getTableName method from RangeInputSplit", e);
+      } catch (InvocationTargetException e) {
+        log.debug("Could not invoke getTableName method from RangeInputSplit", e);
+      }
+    }
+
+    Method getTable;
+    try {
+      getTable = RangeInputSplit.class.getMethod("getTable");
+    } catch (SecurityException e) {
+      throw new IOException("Could not get table name from RangeInputSplit", e);
+    } catch (NoSuchMethodException e) {
+      throw new IOException("Could not get table name from RangeInputSplit", e);
+    }
+
+    try {
+      return (String) getTable.invoke(split);
+    } catch (IllegalArgumentException e) {
+      throw new IOException("Could not get table name from RangeInputSplit", e);
+    } catch (IllegalAccessException e) {
+      throw new IOException("Could not get table name from RangeInputSplit", e);
+    } catch (InvocationTargetException e) {
+      throw new IOException("Could not get table name from RangeInputSplit", e);
+    }
+  }
+
+  /**
+   * Sets the table name on a RangeInputSplit, accounting for change in method name. Any reflection
+   * related exception is wrapped in an {@link IOException}
+   *
+   * @param split
+   *          The RangeInputSplit to operate on
+   * @param tableName
+   *          The name of the table to set
+   * @throws IOException
+   */
+  protected void setTableName(RangeInputSplit split, String tableName) throws IOException {
+    // ACCUMULO-3017 shenanigans with method names changing without deprecation
+    Method setTableName = null;
+    try {
+      setTableName = RangeInputSplit.class.getMethod("setTableName", String.class);
+    } catch (SecurityException e) {
+      log.debug("Could not get getTableName method from RangeInputSplit", e);
+    } catch (NoSuchMethodException e) {
+      log.debug("Could not get getTableName method from RangeInputSplit", e);
+    }
+
+    if (null != setTableName) {
+      try {
+        setTableName.invoke(split, tableName);
+        return;
+      } catch (IllegalArgumentException e) {
+        log.debug("Could not invoke getTableName method from RangeInputSplit", e);
+      } catch (IllegalAccessException e) {
+        log.debug("Could not invoke getTableName method from RangeInputSplit", e);
+      } catch (InvocationTargetException e) {
+        log.debug("Could not invoke getTableName method from RangeInputSplit", e);
+      }
+    }
+
+    Method setTable;
+    try {
+      setTable = RangeInputSplit.class.getMethod("setTable", String.class);
+    } catch (SecurityException e) {
+      throw new IOException("Could not set table name from RangeInputSplit", e);
+    } catch (NoSuchMethodException e) {
+      throw new IOException("Could not set table name from RangeInputSplit", e);
+    }
+
+    try {
+      setTable.invoke(split, tableName);
+    } catch (IllegalArgumentException e) {
+      throw new IOException("Could not set table name from RangeInputSplit", e);
+    } catch (IllegalAccessException e) {
+      throw new IOException("Could not set table name from RangeInputSplit", e);
+    } catch (InvocationTargetException e) {
+      throw new IOException("Could not set table name from RangeInputSplit", e);
+    }
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.mr;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.accumulo.AccumuloConnectionParameters;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
+import org.apache.hadoop.mapred.JobConf;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *
+ */
+public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat {
+
+  @Override
+  public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+    configureAccumuloOutputFormat(job);
+
+    super.checkOutputSpecs(ignored, job);
+  }
+
+  protected void configureAccumuloOutputFormat(JobConf job) throws IOException {
+    AccumuloConnectionParameters cnxnParams = new AccumuloConnectionParameters(job);
+
+    final String tableName = job.get(AccumuloSerDeParameters.TABLE_NAME);
+
+    // Make sure we actually go the table name
+    Preconditions.checkNotNull(tableName,
+        "Expected Accumulo table name to be provided in job configuration");
+
+    // Set the necessary Accumulo information
+    try {
+      // Username/passwd for Accumulo
+      setAccumuloConnectorInfo(job, cnxnParams.getAccumuloUserName(),
+          new PasswordToken(cnxnParams.getAccumuloPassword()));
+
+      if (cnxnParams.useMockInstance()) {
+        setAccumuloMockInstance(job, cnxnParams.getAccumuloInstanceName());
+      } else {
+        // Accumulo instance name with ZK quorum
+        setAccumuloZooKeeperInstance(job, cnxnParams.getAccumuloInstanceName(),
+            cnxnParams.getZooKeepers());
+      }
+
+      // Set the table where we're writing this data
+      setDefaultAccumuloTableName(job, tableName);
+    } catch (AccumuloSecurityException e) {
+      log.error("Could not connect to Accumulo with provided credentials", e);
+      throw new IOException(e);
+    }
+  }
+
+  // Non-static methods to wrap the static AccumuloOutputFormat methods to enable testing
+
+  protected void setAccumuloConnectorInfo(JobConf conf, String username, AuthenticationToken token)
+      throws AccumuloSecurityException {
+    AccumuloOutputFormat.setConnectorInfo(conf, username, token);
+  }
+
+  @SuppressWarnings("deprecation")
+  protected void setAccumuloZooKeeperInstance(JobConf conf, String instanceName, String zookeepers) {
+    AccumuloOutputFormat.setZooKeeperInstance(conf, instanceName, zookeepers);
+  }
+
+  protected void setAccumuloMockInstance(JobConf conf, String instanceName) {
+    AccumuloOutputFormat.setMockInstance(conf, instanceName);
+  }
+
+  protected void setDefaultAccumuloTableName(JobConf conf, String tableName) {
+    AccumuloOutputFormat.setDefaultTableName(conf, tableName);
+  }
+}

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/package-info.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/package-info.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/package-info.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/package-info.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,4 @@
+/**
+ * Serde and InputFormat support for connecting Hive to Accumulo tables.
+ */
+package org.apache.hadoop.hive.accumulo;
\ No newline at end of file

Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,408 @@
+package org.apache.hadoop.hive.accumulo.predicate;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.Range;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.accumulo.columns.ColumnMapper;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping;
+import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp;
+import org.apache.hadoop.hive.accumulo.predicate.compare.DoubleCompare;
+import org.apache.hadoop.hive.accumulo.predicate.compare.Equal;
+import org.apache.hadoop.hive.accumulo.predicate.compare.GreaterThan;
+import org.apache.hadoop.hive.accumulo.predicate.compare.GreaterThanOrEqual;
+import org.apache.hadoop.hive.accumulo.predicate.compare.IntCompare;
+import org.apache.hadoop.hive.accumulo.predicate.compare.LessThan;
+import org.apache.hadoop.hive.accumulo.predicate.compare.LessThanOrEqual;
+import org.apache.hadoop.hive.accumulo.predicate.compare.Like;
+import org.apache.hadoop.hive.accumulo.predicate.compare.LongCompare;
+import org.apache.hadoop.hive.accumulo.predicate.compare.NotEqual;
+import org.apache.hadoop.hive.accumulo.predicate.compare.PrimitiveComparison;
+import org.apache.hadoop.hive.accumulo.predicate.compare.StringCompare;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
+import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.udf.UDFLike;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ *
+ * Supporting operations dealing with Hive Predicate pushdown to iterators and ranges.
+ *
+ * See {@link PrimitiveComparisonFilter}
+ *
+ */
+public class AccumuloPredicateHandler {
+  private static final List<Range> TOTAL_RANGE = Collections.singletonList(new Range());
+
+  private static AccumuloPredicateHandler handler = new AccumuloPredicateHandler();
+  private static Map<String,Class<? extends CompareOp>> compareOps = Maps.newHashMap();
+  private static Map<String,Class<? extends PrimitiveComparison>> pComparisons = Maps.newHashMap();
+
+  // Want to start sufficiently "high" enough in the iterator stack
+  private static int iteratorCount = 50;
+
+  private static final Logger log = Logger.getLogger(AccumuloPredicateHandler.class);
+  static {
+    compareOps.put(GenericUDFOPEqual.class.getName(), Equal.class);
+    compareOps.put(GenericUDFOPNotEqual.class.getName(), NotEqual.class);
+    compareOps.put(GenericUDFOPGreaterThan.class.getName(), GreaterThan.class);
+    compareOps.put(GenericUDFOPEqualOrGreaterThan.class.getName(), GreaterThanOrEqual.class);
+    compareOps.put(GenericUDFOPEqualOrLessThan.class.getName(), LessThanOrEqual.class);
+    compareOps.put(GenericUDFOPLessThan.class.getName(), LessThan.class);
+    compareOps.put(UDFLike.class.getName(), Like.class);
+
+    pComparisons.put("bigint", LongCompare.class);
+    pComparisons.put("int", IntCompare.class);
+    pComparisons.put("double", DoubleCompare.class);
+    pComparisons.put("string", StringCompare.class);
+  }
+
+  public static AccumuloPredicateHandler getInstance() {
+    return handler;
+  }
+
+  /**
+   *
+   * @return set of all UDF class names with matching CompareOpt implementations.
+   */
+  public Set<String> cOpKeyset() {
+    return compareOps.keySet();
+  }
+
+  /**
+   *
+   * @return set of all hive data types with matching PrimitiveCompare implementations.
+   */
+  public Set<String> pComparisonKeyset() {
+    return pComparisons.keySet();
+  }
+
+  /**
+   *
+   * @param udfType
+   *          GenericUDF classname to lookup matching CompareOpt
+   * @return Class<? extends CompareOpt/>
+   */
+  public Class<? extends CompareOp> getCompareOpClass(String udfType)
+      throws NoSuchCompareOpException {
+    if (!compareOps.containsKey(udfType))
+      throw new NoSuchCompareOpException("Null compare op for specified key: " + udfType);
+    return compareOps.get(udfType);
+  }
+
+  public CompareOp getCompareOp(String udfType, IndexSearchCondition sc)
+      throws NoSuchCompareOpException, SerDeException {
+    Class<? extends CompareOp> clz = getCompareOpClass(udfType);
+
+    try {
+      return clz.newInstance();
+    } catch (ClassCastException e) {
+      throw new SerDeException("Column type mismatch in WHERE clause "
+          + sc.getComparisonExpr().getExprString() + " found type "
+          + sc.getConstantDesc().getTypeString() + " instead of "
+          + sc.getColumnDesc().getTypeString());
+    } catch (IllegalAccessException e) {
+      throw new SerDeException("Could not instantiate class for WHERE clause", e);
+    } catch (InstantiationException e) {
+      throw new SerDeException("Could not instantiate class for WHERE clause", e);
+    }
+  }
+
+  /**
+   *
+   * @param type
+   *          String hive column lookup matching PrimitiveCompare
+   * @return Class<? extends ></?>
+   */
+  public Class<? extends PrimitiveComparison> getPrimitiveComparisonClass(String type)
+      throws NoSuchPrimitiveComparisonException {
+    if (!pComparisons.containsKey(type))
+      throw new NoSuchPrimitiveComparisonException("Null primitive comparison for specified key: "
+          + type);
+    return pComparisons.get(type);
+  }
+
+  public PrimitiveComparison getPrimitiveComparison(String type, IndexSearchCondition sc)
+      throws NoSuchPrimitiveComparisonException, SerDeException {
+    Class<? extends PrimitiveComparison> clz = getPrimitiveComparisonClass(type);
+
+    try {
+      return clz.newInstance();
+    } catch (ClassCastException e) {
+      throw new SerDeException("Column type mismatch in WHERE clause "
+          + sc.getComparisonExpr().getExprString() + " found type "
+          + sc.getConstantDesc().getTypeString() + " instead of "
+          + sc.getColumnDesc().getTypeString());
+    } catch (IllegalAccessException e) {
+      throw new SerDeException("Could not instantiate class for WHERE clause", e);
+    } catch (InstantiationException e) {
+      throw new SerDeException("Could not instantiate class for WHERE clause", e);
+    }
+  }
+
+  private AccumuloPredicateHandler() {}
+
+  /**
+   * Loop through search conditions and build ranges for predicates involving rowID column, if any.
+   */
+  public List<Range> getRanges(Configuration conf, ColumnMapper columnMapper) throws SerDeException {
+    if (!columnMapper.hasRowIdMapping()) {
+      return TOTAL_RANGE;
+    }
+
+    int rowIdOffset = columnMapper.getRowIdOffset();
+    String[] hiveColumnNamesArr = conf.getStrings(serdeConstants.LIST_COLUMNS);
+
+    if (null == hiveColumnNamesArr) {
+      throw new IllegalArgumentException("Could not find Hive columns in configuration");
+    }
+
+    // Already verified that we should have the rowId mapping
+    String hiveRowIdColumnName = hiveColumnNamesArr[rowIdOffset];
+
+    ExprNodeDesc root = this.getExpression(conf);
+
+    // No expression, therefore scan the whole table
+    if (null == root) {
+      return TOTAL_RANGE;
+    }
+
+    Object result = generateRanges(columnMapper, hiveRowIdColumnName, root);
+
+    if (null == result) {
+      log.info("Calculated null set of ranges, scanning full table");
+      return TOTAL_RANGE;
+    } else if (result instanceof Range) {
+      log.info("Computed a single Range for the query: " + result);
+      return Collections.singletonList((Range) result);
+    } else if (result instanceof List) {
+      log.info("Computed a collection of Ranges for the query: " + result);
+      @SuppressWarnings("unchecked")
+      List<Range> ranges = (List<Range>) result;
+      return ranges;
+    } else {
+      throw new IllegalArgumentException("Unhandled return from Range generation: " + result);
+    }
+  }
+
+  /**
+   * Encapsulates the traversal over some {@link ExprNodeDesc} tree for the generation of Accumuluo
+   * Ranges using expressions involving the Accumulo rowid-mapped Hive column
+   *
+   * @param columnMapper
+   *          Mapping of Hive to Accumulo columns for the query
+   * @param hiveRowIdColumnName
+   *          Name of the hive column mapped to the Accumulo rowid
+   * @param root
+   *          Root of some ExprNodeDesc tree to traverse, the WHERE clause
+   * @return An object representing the result from the ExprNodeDesc tree traversal using the
+   *         AccumuloRangeGenerator
+   */
+  protected Object generateRanges(ColumnMapper columnMapper, String hiveRowIdColumnName, ExprNodeDesc root) {
+    AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler,
+        columnMapper.getRowIdMapping(), hiveRowIdColumnName);
+    Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
+        Collections.<Rule,NodeProcessor> emptyMap(), null);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+    ArrayList<Node> roots = new ArrayList<Node>();
+    roots.add(root);
+    HashMap<Node,Object> nodeOutput = new HashMap<Node,Object>();
+
+    try {
+      ogw.startWalking(roots, nodeOutput);
+    } catch (SemanticException ex) {
+      throw new RuntimeException(ex);
+    }
+
+    return nodeOutput.get(root);
+  }
+
+  /**
+   * Loop through search conditions and build iterator settings for predicates involving columns
+   * other than rowID, if any.
+   *
+   * @param conf
+   *          Configuration
+   * @throws SerDeException
+   */
+  public List<IteratorSetting> getIterators(Configuration conf, ColumnMapper columnMapper)
+      throws SerDeException {
+    List<IteratorSetting> itrs = Lists.newArrayList();
+    boolean shouldPushdown = conf.getBoolean(AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY,
+        AccumuloSerDeParameters.ITERATOR_PUSHDOWN_DEFAULT);
+    if (!shouldPushdown) {
+      log.info("Iterator pushdown is disabled for this table");
+      return itrs;
+    }
+
+    int rowIdOffset = columnMapper.getRowIdOffset();
+    String[] hiveColumnNamesArr = conf.getStrings(serdeConstants.LIST_COLUMNS);
+
+    if (null == hiveColumnNamesArr) {
+      throw new IllegalArgumentException("Could not find Hive columns in configuration");
+    }
+
+    String hiveRowIdColumnName = null;
+
+    if (rowIdOffset >= 0 && rowIdOffset < hiveColumnNamesArr.length) {
+      hiveRowIdColumnName = hiveColumnNamesArr[rowIdOffset];
+    }
+
+    List<String> hiveColumnNames = Arrays.asList(hiveColumnNamesArr);
+
+    for (IndexSearchCondition sc : getSearchConditions(conf)) {
+      String col = sc.getColumnDesc().getColumn();
+      if (hiveRowIdColumnName == null || !hiveRowIdColumnName.equals(col)) {
+        HiveAccumuloColumnMapping mapping = (HiveAccumuloColumnMapping) columnMapper
+            .getColumnMappingForHiveColumn(hiveColumnNames, col);
+        itrs.add(toSetting(mapping, sc));
+      }
+    }
+    if (log.isInfoEnabled())
+      log.info("num iterators = " + itrs.size());
+    return itrs;
+  }
+
+  /**
+   * Create an IteratorSetting for the right qualifier, constant, CompareOpt, and PrimitiveCompare
+   * type.
+   *
+   * @param accumuloColumnMapping
+   *          ColumnMapping to filter
+   * @param sc
+   *          IndexSearchCondition
+   * @return IteratorSetting
+   * @throws SerDeException
+   */
+  public IteratorSetting toSetting(HiveAccumuloColumnMapping accumuloColumnMapping,
+      IndexSearchCondition sc) throws SerDeException {
+    iteratorCount++;
+    final IteratorSetting is = new IteratorSetting(iteratorCount,
+        PrimitiveComparisonFilter.FILTER_PREFIX + iteratorCount, PrimitiveComparisonFilter.class);
+    final String type = sc.getColumnDesc().getTypeString();
+    final String comparisonOpStr = sc.getComparisonOp();
+
+    PushdownTuple tuple;
+    try {
+      tuple = new PushdownTuple(sc, getPrimitiveComparison(type, sc), getCompareOp(comparisonOpStr,
+          sc));
+    } catch (NoSuchPrimitiveComparisonException e) {
+      throw new SerDeException("No configured PrimitiveComparison class for " + type, e);
+    } catch (NoSuchCompareOpException e) {
+      throw new SerDeException("No configured CompareOp class for " + comparisonOpStr, e);
+    }
+
+    is.addOption(PrimitiveComparisonFilter.P_COMPARE_CLASS, tuple.getpCompare().getClass()
+        .getName());
+    is.addOption(PrimitiveComparisonFilter.COMPARE_OPT_CLASS, tuple.getcOpt().getClass().getName());
+    is.addOption(PrimitiveComparisonFilter.CONST_VAL,
+        new String(Base64.encodeBase64(tuple.getConstVal())));
+    is.addOption(PrimitiveComparisonFilter.COLUMN, accumuloColumnMapping.serialize());
+
+    return is;
+  }
+
+  public ExprNodeDesc getExpression(Configuration conf) {
+    String filteredExprSerialized = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+    if (filteredExprSerialized == null)
+      return null;
+
+    return Utilities.deserializeExpression(filteredExprSerialized);
+  }
+
+  /**
+   *
+   * @param conf
+   *          Configuration
+   * @return list of IndexSearchConditions from the filter expression.
+   */
+  public List<IndexSearchCondition> getSearchConditions(Configuration conf) {
+    final List<IndexSearchCondition> sConditions = Lists.newArrayList();
+    ExprNodeDesc filterExpr = getExpression(conf);
+    if (null == filterExpr) {
+      return sConditions;
+    }
+    IndexPredicateAnalyzer analyzer = newAnalyzer(conf);
+    ExprNodeDesc residual = analyzer.analyzePredicate(filterExpr, sConditions);
+    if (residual != null)
+      throw new RuntimeException("Unexpected residual predicate: " + residual.getExprString());
+    return sConditions;
+  }
+
+  /**
+   *
+   * @param conf
+   *          Configuration
+   * @param desc
+   *          predicate expression node.
+   * @return DecomposedPredicate containing translated search conditions the analyzer can support.
+   */
+  public DecomposedPredicate decompose(Configuration conf, ExprNodeDesc desc) {
+    IndexPredicateAnalyzer analyzer = newAnalyzer(conf);
+    List<IndexSearchCondition> sConditions = new ArrayList<IndexSearchCondition>();
+    ExprNodeDesc residualPredicate = analyzer.analyzePredicate(desc, sConditions);
+
+    if (sConditions.size() == 0) {
+      if (log.isInfoEnabled())
+        log.info("nothing to decompose. Returning");
+      return null;
+    }
+
+    DecomposedPredicate decomposedPredicate = new DecomposedPredicate();
+    decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(sConditions);
+    decomposedPredicate.residualPredicate = (ExprNodeGenericFuncDesc) residualPredicate;
+    return decomposedPredicate;
+  }
+
+  /**
+   * Build an analyzer that allows comparison opts from compareOpts map, and all columns from table
+   * definition.
+   */
+  private IndexPredicateAnalyzer newAnalyzer(Configuration conf) {
+    IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer();
+    analyzer.clearAllowedColumnNames();
+    for (String op : cOpKeyset()) {
+      analyzer.addComparisonOp(op);
+    }
+
+    String[] hiveColumnNames = conf.getStrings(serdeConstants.LIST_COLUMNS);
+    for (String col : hiveColumnNames) {
+      analyzer.allowColumnName(col);
+    }
+
+    return analyzer;
+  }
+}