You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/04/20 18:53:29 UTC

[1/2] hive git commit: HIVE-15795 : Support Accumulo Index Tables in Hive Accumulo Connector (Mike Fagan, reviewed by Josh Elser)

Repository: hive
Updated Branches:
  refs/heads/master bde615234 -> 169e65592


http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloIndexParameters.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloIndexParameters.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloIndexParameters.java
new file mode 100644
index 0000000..d295c7b
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloIndexParameters.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo.serde;
+
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.accumulo.AccumuloDefaultIndexScanner;
+import org.apache.hadoop.hive.accumulo.AccumuloIndexScanner;
+import org.apache.hadoop.hive.accumulo.AccumuloIndexScannerException;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * Accumulo Index Parameters for Hive tables.
+ */
+public class AccumuloIndexParameters {
+  public static final int DEFAULT_MAX_ROWIDS = 20000;
+  public static final String INDEX_SCANNER = "accumulo.index.scanner";
+  public static final String MAX_INDEX_ROWS = "accumulo.index.rows.max";
+  public static final String INDEXED_COLUMNS = "accumulo.indexed.columns";
+  public static final String INDEXTABLE_NAME = "accumulo.indextable.name";
+  private static final Set<String> EMPTY_SET = new HashSet<String>();
+  private Configuration conf;
+
+  public AccumuloIndexParameters(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public String getIndexTable() {
+    return this.conf.get(INDEXTABLE_NAME);
+  }
+
+  public int getMaxIndexRows() {
+    return this.conf.getInt(MAX_INDEX_ROWS, DEFAULT_MAX_ROWIDS);
+  }
+
+  public final Set<String> getIndexColumns() {
+    String colmap = conf.get(INDEXED_COLUMNS);
+    if (colmap != null) {
+      Set<String> cols = new HashSet<String>();
+        for (String col : colmap.split(",")) {
+          cols.add(col.trim());
+        }
+        return cols;
+    }
+    return EMPTY_SET;
+  }
+
+
+  public final Authorizations getTableAuths() {
+    String auths = conf.get(AccumuloSerDeParameters.AUTHORIZATIONS_KEY);
+    if (auths != null && !auths.isEmpty()) {
+      return new Authorizations(auths.trim().getBytes(StandardCharsets.UTF_8));
+    }
+    return new Authorizations();
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public final AccumuloIndexScanner createScanner() throws AccumuloIndexScannerException {
+    AccumuloIndexScanner handler;
+
+    String classname = conf.get(INDEX_SCANNER);
+    if (classname != null) {
+      try {
+        handler = (AccumuloIndexScanner) Class.forName(classname).newInstance();
+      } catch (ClassCastException | InstantiationException |  IllegalAccessException
+          | ClassNotFoundException e) {
+        throw new AccumuloIndexScannerException("Cannot use index scanner class: " + classname, e);
+      }
+    } else {
+      handler = new AccumuloDefaultIndexScanner();
+    }
+    if (handler != null) {
+      handler.init(conf);
+    }
+    return handler;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDeParameters.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDeParameters.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDeParameters.java
index 09c5f24..ef454f0 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDeParameters.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDeParameters.java
@@ -17,9 +17,11 @@
 package org.apache.hadoop.hive.accumulo.serde;
 
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Properties;
+import java.util.Set;
 
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.ColumnVisibility;
@@ -58,12 +60,21 @@ public class AccumuloSerDeParameters extends AccumuloConnectionParameters {
 
   public static final String COMPOSITE_ROWID_FACTORY = "accumulo.composite.rowid.factory";
   public static final String COMPOSITE_ROWID_CLASS = "accumulo.composite.rowid";
+  public static final int DEFAULT_MAX_ROWIDS = 20000;
+  public static final String INDEX_SCANNER = "accumulo.index.scanner";
+  public static final String MAX_INDEX_ROWS = "accumulo.index.rows.max";
+  public static final String INDEXED_COLUMNS = "accumulo.indexed.columns";
+  public static final String INDEXTABLE_NAME = "accumulo.indextable.name";
+  private static final Set<String> EMPTY_SET = new HashSet<String>();
+
+
 
   protected final ColumnMapper columnMapper;
 
   private Properties tableProperties;
   private String serdeName;
   private LazySerDeParameters lazySerDeParameters;
+  private AccumuloIndexParameters indexParams;
   private AccumuloRowIdFactory rowIdFactory;
 
   public AccumuloSerDeParameters(Configuration conf, Properties tableProperties, String serdeName)
@@ -73,6 +84,7 @@ public class AccumuloSerDeParameters extends AccumuloConnectionParameters {
     this.serdeName = serdeName;
 
     lazySerDeParameters = new LazySerDeParameters(conf, tableProperties, serdeName);
+    indexParams = new AccumuloIndexParameters(conf);
 
     // The default encoding for this table when not otherwise specified
     String defaultStorage = tableProperties.getProperty(DEFAULT_STORAGE_TYPE);
@@ -135,10 +147,17 @@ public class AccumuloSerDeParameters extends AccumuloConnectionParameters {
     return new DefaultAccumuloRowIdFactory();
   }
 
+  public AccumuloIndexParameters getIndexParams() {
+    return indexParams;
+  }
+
   public LazySerDeParameters getSerDeParameters() {
+
     return lazySerDeParameters;
   }
 
+
+
   public Properties getTableProperties() {
     return tableProperties;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/package-info.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/package-info.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/package-info.java
new file mode 100644
index 0000000..7311e87
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * accumulo serde classes
+ */
+package org.apache.hadoop.hive.accumulo.serde;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloDefaultIndexScanner.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloDefaultIndexScanner.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloDefaultIndexScanner.java
new file mode 100644
index 0000000..7d6cc0e
--- /dev/null
+++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloDefaultIndexScanner.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloIndexParameters;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestAccumuloDefaultIndexScanner {
+  private static final Logger LOG = LoggerFactory.getLogger(TestAccumuloDefaultIndexScanner.class);
+  private static final Value EMPTY_VALUE = new Value();
+
+  private static void addRow(BatchWriter writer, String rowId, String cf, String cq) throws MutationsRejectedException {
+    Mutation mut = new Mutation(rowId);
+    mut.put(new Text(cf), new Text(cq), EMPTY_VALUE);
+    writer.addMutation(mut);
+  }
+
+  private static void addRow(BatchWriter writer, Integer rowId, String cf, String cq) throws MutationsRejectedException {
+    Mutation mut = new Mutation(AccumuloIndexLexicoder.encodeValue(String.valueOf(rowId).getBytes(), "int", true));
+    mut.put(new Text(cf), new Text(cq), EMPTY_VALUE);
+    writer.addMutation(mut);
+  }
+
+  private static void addRow(BatchWriter writer, boolean rowId, String cf, String cq) throws MutationsRejectedException {
+    Mutation mut = new Mutation(String.valueOf(rowId));
+    mut.put(new Text(cf), new Text(cq), EMPTY_VALUE);
+    writer.addMutation(mut);
+  }
+
+  public static AccumuloDefaultIndexScanner buildMockHandler(int maxMatches) {
+    try {
+      String table = "table";
+      Text emptyText = new Text("");
+      Configuration conf = new Configuration();
+      conf.set(AccumuloIndexParameters.INDEXTABLE_NAME, table);
+      conf.setInt(AccumuloIndexParameters.MAX_INDEX_ROWS, maxMatches);
+      conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, "*");
+      conf.set(serdeConstants.LIST_COLUMNS, "rid,name,age,cars,mgr");
+      conf.set(AccumuloSerDeParameters.COLUMN_MAPPINGS, ":rowId,name:name,age:age,cars:cars,mgr:mgr");
+      AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner();
+      handler.init(conf);
+
+      MockInstance inst = new MockInstance("test_instance");
+      Connector conn = inst.getConnector("root", new PasswordToken(""));
+      if (!conn.tableOperations().exists(table)) {
+        conn.tableOperations().create(table);
+        BatchWriterConfig batchConfig = new BatchWriterConfig();
+        BatchWriter writer = conn.createBatchWriter(table, batchConfig);
+        addRow(writer, "fred", "name_name", "row1");
+        addRow(writer, "25", "age_age", "row1");
+        addRow(writer, 5, "cars_cars", "row1");
+        addRow(writer, true, "mgr_mgr", "row1");
+        addRow(writer, "bill", "name_name", "row2");
+        addRow(writer, "20", "age_age", "row2");
+        addRow(writer, 2, "cars_cars", "row2");
+        addRow(writer, false, "mgr_mgr", "row2");
+        addRow(writer, "sally", "name_name", "row3");
+        addRow(writer, "23", "age_age", "row3");
+        addRow(writer, 6, "cars_cars", "row3");
+        addRow(writer, true, "mgr_mgr", "row3");
+        addRow(writer, "rob", "name_name", "row4");
+        addRow(writer, "60", "age_age", "row4");
+        addRow(writer, 1, "cars_cars", "row4");
+        addRow(writer, false, "mgr_mgr", "row4");
+        writer.close();
+      }
+      AccumuloConnectionParameters connectionParams = Mockito
+          .mock(AccumuloConnectionParameters.class);
+      AccumuloStorageHandler storageHandler = Mockito.mock(AccumuloStorageHandler.class);
+
+      Mockito.when(connectionParams.getConnector()).thenReturn(conn);
+      handler.setConnectParams(connectionParams);
+      return handler;
+    } catch (AccumuloSecurityException | AccumuloException | TableExistsException | TableNotFoundException e) {
+      LOG.error(e.getLocalizedMessage(), e);
+    }
+    return null;
+  }
+
+  @Test
+  public void testMatchNone() {
+    AccumuloDefaultIndexScanner handler = buildMockHandler(10);
+    List<Range> ranges = handler.getIndexRowRanges("name", new Range("mike"));
+    assertEquals(0, ranges.size());
+  }
+
+  @Test
+  public void testMatchRange() {
+    AccumuloDefaultIndexScanner handler = buildMockHandler(10);
+    List<Range> ranges = handler.getIndexRowRanges("age", new Range("10", "50"));
+    assertEquals(3, ranges.size());
+    assertTrue("does not contain row1", ranges.contains(new Range("row1")));
+    assertTrue("does not contain row2", ranges.contains(new Range("row2")));
+    assertTrue("does not contain row3", ranges.contains(new Range("row3")));
+  }
+
+  public void testTooManyMatches() {
+    AccumuloDefaultIndexScanner handler = buildMockHandler(2);
+    List<Range> ranges = handler.getIndexRowRanges("age", new Range("10", "50"));
+    assertNull("ranges should be null", ranges);
+  }
+
+  @Test
+  public void testMatchExact() {
+    AccumuloDefaultIndexScanner handler = buildMockHandler(10);
+    List<Range> ranges = handler.getIndexRowRanges("age", new Range("20"));
+    assertEquals(1, ranges.size());
+    assertTrue("does not contain row2", ranges.contains(new Range("row2")));
+  }
+
+  @Test
+  public void testValidIndex() {
+    Configuration conf = new Configuration();
+    conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, "name,age,phone,email");
+    conf.set(AccumuloIndexParameters.INDEXTABLE_NAME, "contact");
+    AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner();
+    handler.init(conf);
+    assertTrue("name is not identified as an index", handler.isIndexed("name"));
+    assertTrue("age is not identified as an index", handler.isIndexed("age"));
+    assertTrue("phone is not identified as an index", handler.isIndexed("phone"));
+    assertTrue("email is not identified as an index", handler.isIndexed("email"));
+  }
+
+  @Test
+  public void testInvalidIndex() {
+    Configuration conf = new Configuration();
+    conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, "name,age,phone,email");
+    conf.set(AccumuloIndexParameters.INDEXTABLE_NAME, "contact");
+    AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner();
+    handler.init(conf);
+    assertFalse("mobile is identified as an index", handler.isIndexed("mobile"));
+    assertFalse("mail is identified as an index", handler.isIndexed("mail"));
+  }
+
+
+  @Test
+  public void testMissingTable() {
+    Configuration conf = new Configuration();
+    conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, "name,age,phone,email");
+    AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner();
+    handler.init(conf);
+    assertFalse("name is identified as an index", handler.isIndexed("name"));
+    assertFalse("age is identified as an index", handler.isIndexed("age"));
+  }
+
+  @Test
+  public void testWildcardIndex() {
+    Configuration conf = new Configuration();
+    conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, "*");
+    conf.set(AccumuloIndexParameters.INDEXTABLE_NAME, "contact");
+    AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner();
+    handler.init(conf);
+    assertTrue("name is not identified as an index", handler.isIndexed("name"));
+    assertTrue("age is not identified as an index", handler.isIndexed("age"));
+  }
+
+  @Test
+  public void testNullIndex() {
+    Configuration conf = new Configuration();
+    conf.set(AccumuloIndexParameters.INDEXTABLE_NAME, "contact");
+    AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner();
+    handler.init(conf);
+    assertTrue("name is not identified as an index", handler.isIndexed("name"));
+  }
+
+  @Test
+  public void testEmptyIndex() {
+    Configuration conf = new Configuration();
+    conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, "");
+    conf.set(AccumuloIndexParameters.INDEXTABLE_NAME, "contact");
+    AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner();
+    handler.init(conf);
+    assertFalse("name is identified as an index", handler.isIndexed("name"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexLexicoder.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexLexicoder.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexLexicoder.java
new file mode 100644
index 0000000..1eda364
--- /dev/null
+++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexLexicoder.java
@@ -0,0 +1,160 @@
+package org.apache.hadoop.hive.accumulo;
+
+import org.apache.accumulo.core.client.lexicoder.BigIntegerLexicoder;
+import org.apache.accumulo.core.client.lexicoder.DoubleLexicoder;
+import org.apache.accumulo.core.client.lexicoder.IntegerLexicoder;
+import org.apache.accumulo.core.client.lexicoder.LongLexicoder;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ *
+ */
+public class TestAccumuloIndexLexicoder {
+
+  @Test
+  public void testBooleanString() {
+    byte[] value = Boolean.TRUE.toString().getBytes(UTF_8);
+    assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, serdeConstants.BOOLEAN_TYPE_NAME,
+        true), value);
+  }
+
+  @Test
+  public void testBooleanBinary() {
+    byte[] value = new byte[] { 1 };
+    assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, serdeConstants.BOOLEAN_TYPE_NAME,
+        false), Boolean.TRUE.toString().getBytes(UTF_8));
+  }
+
+  @Test
+  public void testIntString() {
+    byte[] value = "10".getBytes(UTF_8);
+    byte[] encoded = new IntegerLexicoder().encode(10);
+
+    byte[] lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.INT_TYPE_NAME, true);
+    assertArrayEquals(lex, encoded);
+
+    lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.SMALLINT_TYPE_NAME, true);
+    assertArrayEquals(lex, encoded);
+
+    lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.TINYINT_TYPE_NAME, true);
+    assertArrayEquals(lex, encoded);
+  }
+
+  @Test
+  public void testIntBinary() {
+    byte[] value = ByteBuffer.allocate(4).putInt(10).array();
+    byte[] encoded = new IntegerLexicoder().encode(10);
+
+    byte[] lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.INT_TYPE_NAME, false);
+    assertArrayEquals(lex, encoded);
+
+    value = ByteBuffer.allocate(2).putShort((short) 10).array();
+    lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.SMALLINT_TYPE_NAME, false);
+    assertArrayEquals(lex, encoded);
+
+    value = ByteBuffer.allocate(1).put((byte)10).array();
+    lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.TINYINT_TYPE_NAME, false);
+    assertArrayEquals(lex, encoded);
+  }
+
+  @Test
+  public void testFloatBinary() {
+    byte[] value = ByteBuffer.allocate(4).putFloat(10.55f).array();
+    byte[] encoded = new DoubleLexicoder().encode((double)10.55f);
+    String val = new String(encoded);
+
+    byte[] lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.FLOAT_TYPE_NAME, false);
+    assertArrayEquals(lex, encoded);
+
+    value = ByteBuffer.allocate(8).putDouble(10.55).array();
+    encoded = new DoubleLexicoder().encode(10.55);
+    lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.DOUBLE_TYPE_NAME, false);
+    assertArrayEquals(lex, encoded);
+  }
+
+  @Test
+  public void testFloatString() {
+    byte[] value = "10.55".getBytes(UTF_8);
+    byte[] encoded = new DoubleLexicoder().encode(10.55);
+
+    byte[] lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.FLOAT_TYPE_NAME, true);
+    assertArrayEquals(lex, encoded);
+
+    lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.DOUBLE_TYPE_NAME, true);
+    assertArrayEquals(lex, encoded);
+  }
+
+  @Test
+  public void testBigIntBinary() {
+    byte[] value = ByteBuffer.allocate(8).putLong(1232322323).array();
+    byte[] encoded = new LongLexicoder().encode(1232322323L);
+
+    byte[] lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.BIGINT_TYPE_NAME, false);
+    assertArrayEquals(lex, encoded);
+
+    value = new BigInteger( "1232322323", 10 ).toByteArray();
+    encoded = new BigIntegerLexicoder().encode(new BigInteger("1232322323", 10 ));
+    lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.DECIMAL_TYPE_NAME, false);
+    assertArrayEquals(lex, encoded);
+  }
+
+  @Test
+  public void testDecimalString() {
+    String strVal = "12323232233434";
+    byte[] value = strVal.getBytes(UTF_8);
+    byte[] encoded = new BigIntegerLexicoder().encode(new BigInteger(strVal, 10));
+
+    byte[] lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.DECIMAL_TYPE_NAME, true);
+    assertArrayEquals(lex, encoded);
+
+
+    lex = AccumuloIndexLexicoder.encodeValue(value, "DECIMAL (10,3)", true);
+    assertArrayEquals(lex, encoded);
+  }
+
+  @Test
+  public void testDecimalBinary() {
+    BigInteger value = new BigInteger("12323232233434", 10);
+    byte[] encoded = new BigIntegerLexicoder().encode(value);
+
+    byte[] lex = AccumuloIndexLexicoder.encodeValue(value.toByteArray(), serdeConstants.DECIMAL_TYPE_NAME, false);
+    assertArrayEquals(lex, encoded);
+  }
+
+  @Test
+  public void testDateString() {
+    String date = "2016-02-22";
+    byte[] value = date.getBytes(UTF_8);
+    assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, serdeConstants.DATE_TYPE_NAME,
+                                                        true), value);
+  }
+
+  @Test
+  public void testDateTimeString() {
+    String timestamp = "2016-02-22 12:12:06.000000005";
+    byte[] value = timestamp.getBytes(UTF_8);
+    assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, serdeConstants.TIMESTAMP_TYPE_NAME,
+                                                        true), value);
+  }
+
+  @Test
+  public void testString() {
+    String strVal = "The quick brown fox";
+    byte[] value = strVal.getBytes(UTF_8);
+    assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, serdeConstants.STRING_TYPE_NAME,
+                                                        true), value);
+    assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, "varChar(20)",
+                                                        true), value);
+    assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, "CHAR (20)",
+                                                        true), value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexParameters.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexParameters.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexParameters.java
new file mode 100644
index 0000000..976fd27
--- /dev/null
+++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexParameters.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo;
+
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloIndexParameters;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestAccumuloIndexParameters {
+
+  public static class MockAccumuloIndexScanner implements AccumuloIndexScanner {
+
+    @Override
+    public void init(Configuration conf) {
+    }
+
+    @Override
+    public boolean isIndexed(String columnName) {
+      return false;
+    }
+
+    @Override
+    public List<Range> getIndexRowRanges(String column, Range indexRange) {
+      return null;
+    }
+  }
+
+  @Test
+  public void testDefaultScanner() {
+    try {
+      AccumuloIndexScanner scanner = new AccumuloIndexParameters(new Configuration()).createScanner();
+      assertTrue(scanner instanceof AccumuloDefaultIndexScanner);
+    } catch (AccumuloIndexScannerException e) {
+      fail("Unexpected exception thrown");
+    }
+  }
+
+  @Test
+  public void testUserHandler() throws AccumuloIndexScannerException {
+    Configuration conf = new Configuration();
+    conf.set(AccumuloIndexParameters.INDEX_SCANNER, MockAccumuloIndexScanner.class.getName());
+    AccumuloIndexScanner scanner = new AccumuloIndexParameters(conf).createScanner();
+    assertTrue(scanner instanceof MockAccumuloIndexScanner);
+  }
+
+  @Test
+  public void testBadHandler() {
+    Configuration conf = new Configuration();
+    conf.set(AccumuloIndexParameters.INDEX_SCANNER, "a.class.does.not.exist.IndexHandler");
+    try {
+      AccumuloIndexScanner scanner = new AccumuloIndexParameters(conf).createScanner();
+    } catch (AccumuloIndexScannerException e) {
+      return;
+    }
+    fail("Failed to throw exception for class not found");
+  }
+
+  @Test
+  public void getIndexColumns() {
+    Configuration conf = new Configuration();
+    conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, "a,b,c");
+    Set<String> cols = new AccumuloIndexParameters(conf).getIndexColumns();
+    assertEquals(3, cols.size());
+    assertTrue("Missing column a", cols.contains("a"));
+    assertTrue("Missing column b", cols.contains("b"));
+    assertTrue("Missing column c", cols.contains("c"));
+  }
+
+  @Test
+  public void getMaxIndexRows() {
+    Configuration conf = new Configuration();
+    conf.setInt(AccumuloIndexParameters.MAX_INDEX_ROWS, 10);
+    int maxRows = new AccumuloIndexParameters(conf).getMaxIndexRows();
+    assertEquals(10, maxRows);
+  }
+
+  @Test
+  public void getAuths() {
+    Configuration conf = new Configuration();
+    conf.set(AccumuloSerDeParameters.AUTHORIZATIONS_KEY, "public,open");
+    Authorizations auths = new AccumuloIndexParameters(conf).getTableAuths();
+    assertEquals(2, auths.size());
+    assertTrue("Missing auth public", auths.contains("public"));
+    assertTrue("Missing auth open", auths.contains("open"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java
index 0aaa782..8d195ee 100644
--- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java
+++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -59,6 +60,8 @@ public class TestAccumuloStorageHandler {
     Map<String,String> jobProperties = new HashMap<String,String>();
 
     props.setProperty(AccumuloSerDeParameters.COLUMN_MAPPINGS, "cf:cq1,cf:cq2,cf:cq3");
+    props.setProperty(serdeConstants.LIST_COLUMN_TYPES, "string:int:string");
+    props.setProperty(serdeConstants.LIST_COLUMNS, "name,age,email");
     props.setProperty(AccumuloSerDeParameters.TABLE_NAME, "table");
     props.setProperty(AccumuloSerDeParameters.VISIBILITY_LABEL_KEY, "foo");
 

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java
index 88e4530..0bb50e8 100644
--- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java
+++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java
@@ -488,6 +488,7 @@ public class TestAccumuloPredicateHandler {
         TypeInfoFactory.intTypeInfo, TypeInfoFactory.stringTypeInfo);
     conf.set(serdeConstants.LIST_COLUMNS, Joiner.on(',').join(columnNames));
     conf.set(serdeConstants.LIST_COLUMN_TYPES, "string,int,string");
+    conf.set(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE, ColumnEncoding.BINARY.getName());
     String columnMappingStr = "cf:f1,cf:f2,:rowID";
     conf.set(AccumuloSerDeParameters.COLUMN_MAPPINGS, columnMappingStr);
     columnMapper = new ColumnMapper(columnMappingStr, ColumnEncoding.STRING.getName(), columnNames,
@@ -758,7 +759,7 @@ public class TestAccumuloPredicateHandler {
     String hiveRowIdColumnName = "rid";
 
     Mockito.when(mockHandler.getRanges(conf, columnMapper)).thenCallRealMethod();
-    Mockito.when(mockHandler.generateRanges(columnMapper, hiveRowIdColumnName, root)).thenReturn(null);
+    Mockito.when(mockHandler.generateRanges(conf, columnMapper, hiveRowIdColumnName, root)).thenReturn(null);
     Mockito.when(mockHandler.getExpression(conf)).thenReturn(root);
 
     // A null result from AccumuloRangeGenerator is all ranges
@@ -776,7 +777,8 @@ public class TestAccumuloPredicateHandler {
     String hiveRowIdColumnName = "rid";
 
     Mockito.when(mockHandler.getRanges(conf, columnMapper)).thenCallRealMethod();
-    Mockito.when(mockHandler.generateRanges(columnMapper, hiveRowIdColumnName, root)).thenReturn(Collections.emptyList());
+    Mockito.when(mockHandler.generateRanges(conf, columnMapper, hiveRowIdColumnName, root))
+                  .thenReturn(Collections.emptyList());
     Mockito.when(mockHandler.getExpression(conf)).thenReturn(root);
 
     // A null result from AccumuloRangeGenerator is all ranges
@@ -795,7 +797,7 @@ public class TestAccumuloPredicateHandler {
     Range r = new Range("a");
 
     Mockito.when(mockHandler.getRanges(conf, columnMapper)).thenCallRealMethod();
-    Mockito.when(mockHandler.generateRanges(columnMapper, hiveRowIdColumnName, root)).thenReturn(r);
+    Mockito.when(mockHandler.generateRanges(conf, columnMapper, hiveRowIdColumnName, root)).thenReturn(r);
     Mockito.when(mockHandler.getExpression(conf)).thenReturn(root);
 
     // A null result from AccumuloRangeGenerator is all ranges
@@ -814,7 +816,8 @@ public class TestAccumuloPredicateHandler {
     Range r1 = new Range("a"), r2 = new Range("z");
 
     Mockito.when(mockHandler.getRanges(conf, columnMapper)).thenCallRealMethod();
-    Mockito.when(mockHandler.generateRanges(columnMapper, hiveRowIdColumnName, root)).thenReturn(Arrays.asList(r1, r2));
+    Mockito.when(mockHandler.generateRanges(conf, columnMapper, hiveRowIdColumnName, root))
+                 .thenReturn(Arrays.asList(r1, r2));
     Mockito.when(mockHandler.getExpression(conf)).thenReturn(root);
 
     // A null result from AccumuloRangeGenerator is all ranges

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloRangeGenerator.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloRangeGenerator.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloRangeGenerator.java
index 339da07..5f3baab 100644
--- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloRangeGenerator.java
+++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloRangeGenerator.java
@@ -16,20 +16,15 @@
  */
 package org.apache.hadoop.hive.accumulo.predicate;
 
-import static org.junit.Assert.assertNotNull;
-
-import java.sql.Date;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-
+import com.google.common.collect.Lists;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants;
+import org.apache.hadoop.hive.accumulo.TestAccumuloDefaultIndexScanner;
 import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding;
 import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloRowIdColumnMapping;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -42,22 +37,29 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
-import org.apache.hadoop.hive.ql.udf.UDFLike;
 import org.apache.hadoop.hive.ql.udf.UDFToString;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPPlus;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
+import java.sql.Date;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertNotNull;
 
 /**
  *
@@ -66,12 +68,14 @@ public class TestAccumuloRangeGenerator {
 
   private AccumuloPredicateHandler handler;
   private HiveAccumuloRowIdColumnMapping rowIdMapping;
+  private Configuration conf;
 
   @Before
   public void setup() {
     handler = AccumuloPredicateHandler.getInstance();
     rowIdMapping = new HiveAccumuloRowIdColumnMapping(AccumuloHiveConstants.ROWID,
-        ColumnEncoding.STRING, "row", TypeInfoFactory.stringTypeInfo.toString());
+        ColumnEncoding.STRING,"row", TypeInfoFactory.stringTypeInfo.toString());
+    conf = new Configuration(true);
   }
 
   @Test
@@ -108,7 +112,7 @@ public class TestAccumuloRangeGenerator {
     List<Range> expectedRanges = Arrays
         .asList(new Range(new Key("f"), true, new Key("m\0"), false));
 
-    AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid");
+    AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid");
     Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
         Collections.<Rule,NodeProcessor> emptyMap(), null);
     GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -163,7 +167,7 @@ public class TestAccumuloRangeGenerator {
     // Should generate (-inf,+inf)
     List<Range> expectedRanges = Arrays.asList(new Range());
 
-    AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid");
+    AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid");
     Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
         Collections.<Rule,NodeProcessor> emptyMap(), null);
     GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -236,7 +240,7 @@ public class TestAccumuloRangeGenerator {
     // Should generate ['q', +inf)
     List<Range> expectedRanges = Arrays.asList(new Range(new Key("q"), true, null, false));
 
-    AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid");
+    AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid");
     Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
         Collections.<Rule,NodeProcessor> emptyMap(), null);
     GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -291,7 +295,7 @@ public class TestAccumuloRangeGenerator {
     // Should generate [f,+inf)
     List<Range> expectedRanges = Arrays.asList(new Range(new Key("f"), true, null, false));
 
-    AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid");
+    AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid");
     Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
         Collections.<Rule,NodeProcessor> emptyMap(), null);
     GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -349,7 +353,7 @@ public class TestAccumuloRangeGenerator {
     List<Range> expectedRanges = Arrays.asList(new Range(new Key("2014-01-01"), true, new Key(
         "2014-07-01"), false));
 
-    AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid");
+    AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid");
     Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
         Collections.<Rule,NodeProcessor> emptyMap(), null);
     GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -397,7 +401,7 @@ public class TestAccumuloRangeGenerator {
     ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
         new GenericUDFOPEqualOrGreaterThan(), Arrays.asList(key, cast));
 
-    AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "key");
+    AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "key");
     Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
         Collections.<Rule,NodeProcessor> emptyMap(), null);
     GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -446,7 +450,7 @@ public class TestAccumuloRangeGenerator {
     ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
         new GenericUDFOPAnd(), bothFilters);
 
-    AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid");
+    AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid");
     Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
         Collections.<Rule,NodeProcessor> emptyMap(), null);
     GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -464,4 +468,161 @@ public class TestAccumuloRangeGenerator {
     Object result = nodeOutput.get(both);
     Assert.assertNull(result);
   }
+
+  @Test
+  public void testRangeOverStringIndexedField() throws Exception {
+    // age >= '10'
+    ExprNodeDesc column = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, "age", null, false);
+    ExprNodeDesc constant = new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "10");
+    List<ExprNodeDesc> children = Lists.newArrayList();
+    children.add(column);
+    children.add(constant);
+    ExprNodeDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
+        new GenericUDFOPEqualOrGreaterThan(), children);
+    assertNotNull(node);
+
+    // age <= '50'
+    ExprNodeDesc column2 = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, "age", null,
+        false);
+    ExprNodeDesc constant2 = new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "50");
+    List<ExprNodeDesc> children2 = Lists.newArrayList();
+    children2.add(column2);
+    children2.add(constant2);
+    ExprNodeDesc node2 = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
+        new GenericUDFOPEqualOrLessThan(), children2);
+    assertNotNull(node2);
+
+    // And UDF
+    List<ExprNodeDesc> bothFilters = Lists.newArrayList();
+    bothFilters.add(node);
+    bothFilters.add(node2);
+    ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
+        new GenericUDFOPAnd(), bothFilters);
+
+    AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid");
+    rangeGenerator.setIndexScanner(TestAccumuloDefaultIndexScanner.buildMockHandler(10));
+    Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
+        Collections.<Rule,NodeProcessor> emptyMap(), null);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.add(both);
+    HashMap<Node,Object> nodeOutput = new HashMap<Node,Object>();
+
+    try {
+      ogw.startWalking(topNodes, nodeOutput);
+    } catch (SemanticException ex) {
+      throw new RuntimeException(ex);
+    }
+
+    // Filters are using an index which should match 3 rows
+    Object result = nodeOutput.get(both);
+    if ( result instanceof  List) {
+      List results = (List) result;
+      Assert.assertEquals(3, results.size());
+      Assert.assertTrue("does not contain row1", results.contains(new Range("row1")));
+      Assert.assertTrue("does not contain row2", results.contains(new Range("row2")));
+      Assert.assertTrue("does not contain row3", results.contains(new Range("row3")));
+    } else {
+      Assert.fail("Results not a list");
+    }
+  }
+
+  @Test
+  public void testRangeOverIntegerIndexedField() throws Exception {
+    // cars >= 2
+    ExprNodeDesc column = new ExprNodeColumnDesc(TypeInfoFactory.intTypeInfo, "cars", null, false);
+    ExprNodeDesc constant = new ExprNodeConstantDesc(TypeInfoFactory.intTypeInfo, 2);
+    List<ExprNodeDesc> children = Lists.newArrayList();
+    children.add(column);
+    children.add(constant);
+    ExprNodeDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.intTypeInfo,
+        new GenericUDFOPEqualOrGreaterThan(), children);
+    assertNotNull(node);
+
+    //  cars <= 9
+    ExprNodeDesc column2 = new ExprNodeColumnDesc(TypeInfoFactory.intTypeInfo, "cars", null,
+        false);
+    ExprNodeDesc constant2 = new ExprNodeConstantDesc(TypeInfoFactory.intTypeInfo, 9);
+    List<ExprNodeDesc> children2 = Lists.newArrayList();
+    children2.add(column2);
+    children2.add(constant2);
+    ExprNodeDesc node2 = new ExprNodeGenericFuncDesc(TypeInfoFactory.intTypeInfo,
+        new GenericUDFOPEqualOrLessThan(), children2);
+    assertNotNull(node2);
+
+    // And UDF
+    List<ExprNodeDesc> bothFilters = Lists.newArrayList();
+    bothFilters.add(node);
+    bothFilters.add(node2);
+    ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
+        new GenericUDFOPAnd(), bothFilters);
+
+    AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid");
+    rangeGenerator.setIndexScanner(TestAccumuloDefaultIndexScanner.buildMockHandler(10));
+    Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
+        Collections.<Rule,NodeProcessor> emptyMap(), null);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.add(both);
+    HashMap<Node,Object> nodeOutput = new HashMap<Node,Object>();
+
+    try {
+      ogw.startWalking(topNodes, nodeOutput);
+    } catch (SemanticException ex) {
+      throw new RuntimeException(ex);
+    }
+
+    // Filters are using an index which should match 3 rows
+    Object result = nodeOutput.get(both);
+    if ( result instanceof  List) {
+      List results = (List) result;
+      Assert.assertEquals(3, results.size());
+      Assert.assertTrue("does not contain row1", results.contains(new Range("row1")));
+      Assert.assertTrue("does not contain row2", results.contains(new Range("row2")));
+      Assert.assertTrue("does not contain row3", results.contains(new Range("row3")));
+    } else {
+      Assert.fail("Results not a list");
+    }
+  }
+
+  @Test
+  public void testRangeOverBooleanIndexedField() throws Exception {
+    // mgr == true
+    ExprNodeDesc column = new ExprNodeColumnDesc(TypeInfoFactory.booleanTypeInfo, "mgr", null, false);
+    ExprNodeDesc constant = new ExprNodeConstantDesc(TypeInfoFactory.booleanTypeInfo, true);
+    List<ExprNodeDesc> children = Lists.newArrayList();
+    children.add(column);
+    children.add(constant);
+    ExprNodeDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.intTypeInfo,
+        new GenericUDFOPEqual(), children);
+    assertNotNull(node);
+
+    AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid");
+    rangeGenerator.setIndexScanner(TestAccumuloDefaultIndexScanner.buildMockHandler(10));
+    Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
+        Collections.<Rule,NodeProcessor> emptyMap(), null);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.add(node);
+    HashMap<Node,Object> nodeOutput = new HashMap<Node,Object>();
+
+    try {
+      ogw.startWalking(topNodes, nodeOutput);
+    } catch (SemanticException ex) {
+      throw new RuntimeException(ex);
+    }
+
+    // Filters are using an index which should match 2 rows
+    Object result = nodeOutput.get(node);
+    if ( result instanceof  List) {
+      List results = (List) result;
+      Assert.assertEquals(2, results.size());
+      Assert.assertTrue("does not contain row1", results.contains( new Range( "row1")));
+      Assert.assertTrue("does not contain row3", results.contains( new Range( "row3")));
+    }
+    else {
+      Assert.fail("Results not a list");
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/test/queries/positive/accumulo_index.q
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/test/queries/positive/accumulo_index.q b/accumulo-handler/src/test/queries/positive/accumulo_index.q
new file mode 100644
index 0000000..52a33af
--- /dev/null
+++ b/accumulo-handler/src/test/queries/positive/accumulo_index.q
@@ -0,0 +1,44 @@
+DROP TABLE accumulo_index_test;
+
+CREATE TABLE accumulo_index_test (
+   rowid string,
+   active boolean,
+   num_offices tinyint,
+   num_personel smallint,
+   total_manhours int,
+   num_shareholders bigint,
+   eff_rating float,
+   err_rating double,
+   yearly_production decimal,
+   start_date date,
+   address varchar(100),
+   phone char(13),
+   last_update timestamp )
+ROW FORMAT SERDE 'org.apache.hadoop.hive.accumulo.serde.AccumuloSerDe'
+STORED BY 'org.apache.hadoop.hive.accumulo.AccumuloStorageHandler'
+WITH SERDEPROPERTIES (
+   "accumulo.columns.mapping" = ":rowID,a:act,a:off,a:per,a:mhs,a:shs,a:eff,a:err,a:yp,a:sd,a:addr,a:ph,a:lu",
+   "accumulo.table.name"="accumulo_index_test",
+   "accumulo.indexed.columns"="*",
+   "accumulo.indextable.name"="accumulo_index_idx"
+ );
+
+
+insert into accumulo_index_test values( "row1", true, 55, 107, 555555, 1223232332,
+                                 4.5, 0.8, 1232223, "2001-10-10", "123 main street",
+                                 "555-555-5555", "2016-02-22 12:45:07.000000000");
+
+select * from accumulo_index_test where active = 'true';
+select * from accumulo_index_test where num_offices = 55;
+select * from accumulo_index_test where num_personel = 107;
+select * from accumulo_index_test where total_manhours < 555556;
+select * from accumulo_index_test where num_shareholders >= 1223232331;
+select * from accumulo_index_test where eff_rating <= 4.5;
+select * from accumulo_index_test where err_rating >= 0.8;
+select * from accumulo_index_test where yearly_production = 1232223;
+select * from accumulo_index_test where start_date = "2001-10-10";
+select * from accumulo_index_test where address >= "100 main street";
+select * from accumulo_index_test where phone <= "555-555-5555";
+select * from accumulo_index_test where last_update >= "2016-02-22 12:45:07";
+
+DROP TABLE accumulo_index_test;

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/test/results/positive/accumulo_index.q.out
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/test/results/positive/accumulo_index.q.out b/accumulo-handler/src/test/results/positive/accumulo_index.q.out
new file mode 100644
index 0000000..5cb3d73
--- /dev/null
+++ b/accumulo-handler/src/test/results/positive/accumulo_index.q.out
@@ -0,0 +1,180 @@
+PREHOOK: query: DROP TABLE accumulo_index_test
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE accumulo_index_test
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE accumulo_index_test (
+   rowid string,
+   active boolean,
+   num_offices tinyint,
+   num_personel smallint,
+   total_manhours int,
+   num_shareholders bigint,
+   eff_rating float,
+   err_rating double,
+   yearly_production decimal,
+   start_date date,
+   address varchar(100),
+   phone char(13),
+   last_update timestamp )
+ROW FORMAT SERDE 'org.apache.hadoop.hive.accumulo.serde.AccumuloSerDe'
+STORED BY 'org.apache.hadoop.hive.accumulo.AccumuloStorageHandler'
+WITH SERDEPROPERTIES (
+   "accumulo.columns.mapping" = ":rowID,a:act,a:off,a:per,a:mhs,a:shs,a:eff,a:err,a:yp,a:sd,a:addr,a:ph,a:lu",
+   "accumulo.table.name"="accumulo_index_test",
+   "accumulo.indexed.columns"="*",
+   "accumulo.indextable.name"="accumulo_index_idx"
+ )
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@accumulo_index_test
+POSTHOOK: query: CREATE TABLE accumulo_index_test (
+   rowid string,
+   active boolean,
+   num_offices tinyint,
+   num_personel smallint,
+   total_manhours int,
+   num_shareholders bigint,
+   eff_rating float,
+   err_rating double,
+   yearly_production decimal,
+   start_date date,
+   address varchar(100),
+   phone char(13),
+   last_update timestamp )
+ROW FORMAT SERDE 'org.apache.hadoop.hive.accumulo.serde.AccumuloSerDe'
+STORED BY 'org.apache.hadoop.hive.accumulo.AccumuloStorageHandler'
+WITH SERDEPROPERTIES (
+   "accumulo.columns.mapping" = ":rowID,a:act,a:off,a:per,a:mhs,a:shs,a:eff,a:err,a:yp,a:sd,a:addr,a:ph,a:lu",
+   "accumulo.table.name"="accumulo_index_test",
+   "accumulo.indexed.columns"="*",
+   "accumulo.indextable.name"="accumulo_index_idx"
+ )
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@accumulo_index_test
+PREHOOK: query: insert into accumulo_index_test values( "row1", true, 55, 107, 555555, 1223232332,
+                                 4.5, 0.8, 1232223, "2001-10-10", "123 main street",
+                                 "555-555-5555", "2016-02-22 12:45:07.000000000")
+PREHOOK: type: QUERY
+PREHOOK: Output: default@accumulo_index_test
+POSTHOOK: query: insert into accumulo_index_test values( "row1", true, 55, 107, 555555, 1223232332,
+                                 4.5, 0.8, 1232223, "2001-10-10", "123 main street",
+                                 "555-555-5555", "2016-02-22 12:45:07.000000000")
+POSTHOOK: type: QUERY
+POSTHOOK: Output: default@accumulo_index_test
+PREHOOK: query: select * from accumulo_index_test where active = 'true'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+POSTHOOK: query: select * from accumulo_index_test where active = 'true'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+row1	true	55	107	555555	1223232332	4.5	0.8	1232223	2001-10-10	123 main street	555-555-5555 	2016-02-22 12:45:07
+PREHOOK: query: select * from accumulo_index_test where num_offices = 55
+PREHOOK: type: QUERY
+PREHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+POSTHOOK: query: select * from accumulo_index_test where num_offices = 55
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+row1	true	55	107	555555	1223232332	4.5	0.8	1232223	2001-10-10	123 main street	555-555-5555 	2016-02-22 12:45:07
+PREHOOK: query: select * from accumulo_index_test where num_personel = 107
+PREHOOK: type: QUERY
+PREHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+POSTHOOK: query: select * from accumulo_index_test where num_personel = 107
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+row1	true	55	107	555555	1223232332	4.5	0.8	1232223	2001-10-10	123 main street	555-555-5555 	2016-02-22 12:45:07
+PREHOOK: query: select * from accumulo_index_test where total_manhours < 555556
+PREHOOK: type: QUERY
+PREHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+POSTHOOK: query: select * from accumulo_index_test where total_manhours < 555556
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+row1	true	55	107	555555	1223232332	4.5	0.8	1232223	2001-10-10	123 main street	555-555-5555 	2016-02-22 12:45:07
+PREHOOK: query: select * from accumulo_index_test where num_shareholders >= 1223232331
+PREHOOK: type: QUERY
+PREHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+POSTHOOK: query: select * from accumulo_index_test where num_shareholders >= 1223232331
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+row1	true	55	107	555555	1223232332	4.5	0.8	1232223	2001-10-10	123 main street	555-555-5555 	2016-02-22 12:45:07
+PREHOOK: query: select * from accumulo_index_test where eff_rating <= 4.5
+PREHOOK: type: QUERY
+PREHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+POSTHOOK: query: select * from accumulo_index_test where eff_rating <= 4.5
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+row1	true	55	107	555555	1223232332	4.5	0.8	1232223	2001-10-10	123 main street	555-555-5555 	2016-02-22 12:45:07
+PREHOOK: query: select * from accumulo_index_test where err_rating >= 0.8
+PREHOOK: type: QUERY
+PREHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+POSTHOOK: query: select * from accumulo_index_test where err_rating >= 0.8
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+row1	true	55	107	555555	1223232332	4.5	0.8	1232223	2001-10-10	123 main street	555-555-5555 	2016-02-22 12:45:07
+PREHOOK: query: select * from accumulo_index_test where yearly_production = 1232223
+PREHOOK: type: QUERY
+PREHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+POSTHOOK: query: select * from accumulo_index_test where yearly_production = 1232223
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+row1	true	55	107	555555	1223232332	4.5	0.8	1232223	2001-10-10	123 main street	555-555-5555 	2016-02-22 12:45:07
+PREHOOK: query: select * from accumulo_index_test where start_date = "2001-10-10"
+PREHOOK: type: QUERY
+PREHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+POSTHOOK: query: select * from accumulo_index_test where start_date = "2001-10-10"
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+row1	true	55	107	555555	1223232332	4.5	0.8	1232223	2001-10-10	123 main street	555-555-5555 	2016-02-22 12:45:07
+PREHOOK: query: select * from accumulo_index_test where address >= "100 main street"
+PREHOOK: type: QUERY
+PREHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+POSTHOOK: query: select * from accumulo_index_test where address >= "100 main street"
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+row1	true	55	107	555555	1223232332	4.5	0.8	1232223	2001-10-10	123 main street	555-555-5555 	2016-02-22 12:45:07
+PREHOOK: query: select * from accumulo_index_test where phone <= "555-555-5555"
+PREHOOK: type: QUERY
+PREHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+POSTHOOK: query: select * from accumulo_index_test where phone <= "555-555-5555"
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+row1	true	55	107	555555	1223232332	4.5	0.8	1232223	2001-10-10	123 main street	555-555-5555 	2016-02-22 12:45:07
+PREHOOK: query: select * from accumulo_index_test where last_update >= "2016-02-22 12:45:07"
+PREHOOK: type: QUERY
+PREHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+POSTHOOK: query: select * from accumulo_index_test where last_update >= "2016-02-22 12:45:07"
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@accumulo_index_test
+#### A masked pattern was here ####
+row1	true	55	107	555555	1223232332	4.5	0.8	1232223	2001-10-10	123 main street	555-555-5555 	2016-02-22 12:45:07
+PREHOOK: query: DROP TABLE accumulo_index_test
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@accumulo_index_test
+PREHOOK: Output: default@accumulo_index_test
+POSTHOOK: query: DROP TABLE accumulo_index_test
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@accumulo_index_test
+POSTHOOK: Output: default@accumulo_index_test


[2/2] hive git commit: HIVE-15795 : Support Accumulo Index Tables in Hive Accumulo Connector (Mike Fagan, reviewed by Josh Elser)

Posted by se...@apache.org.
HIVE-15795 : Support Accumulo Index Tables in Hive Accumulo Connector (Mike Fagan, reviewed by Josh Elser)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/169e6559
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/169e6559
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/169e6559

Branch: refs/heads/master
Commit: 169e6559286b5a24c0e66b6042ee29da049bc861
Parents: bde6152
Author: sergey <se...@apache.org>
Authored: Thu Apr 20 11:46:36 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Thu Apr 20 11:46:36 2017 -0700

----------------------------------------------------------------------
 accumulo-handler/pom.xml                        |  20 ++
 .../accumulo/AccumuloDefaultIndexScanner.java   | 222 ++++++++++++
 .../hive/accumulo/AccumuloIndexLexicoder.java   | 109 ++++++
 .../hive/accumulo/AccumuloIndexScanner.java     |  56 ++++
 .../accumulo/AccumuloIndexScannerException.java |  39 +++
 .../hive/accumulo/AccumuloStorageHandler.java   | 155 +++++++--
 .../accumulo/mr/AccumuloIndexDefinition.java    |  79 +++++
 .../mr/AccumuloIndexedOutputFormat.java         | 334 +++++++++++++++++++
 .../mr/HiveAccumuloTableOutputFormat.java       |  62 +++-
 .../accumulo/mr/IndexOutputConfigurator.java    |  75 +++++
 .../hadoop/hive/accumulo/mr/package-info.java   |   4 +
 .../predicate/AccumuloPredicateHandler.java     |  87 +++--
 .../predicate/AccumuloRangeGenerator.java       | 123 ++++---
 .../predicate/PrimitiveComparisonFilter.java    |  13 +-
 .../accumulo/serde/AccumuloIndexParameters.java | 100 ++++++
 .../accumulo/serde/AccumuloSerDeParameters.java |  19 ++
 .../hive/accumulo/serde/package-info.java       |   4 +
 .../TestAccumuloDefaultIndexScanner.java        | 218 ++++++++++++
 .../accumulo/TestAccumuloIndexLexicoder.java    | 160 +++++++++
 .../accumulo/TestAccumuloIndexParameters.java   | 112 +++++++
 .../accumulo/TestAccumuloStorageHandler.java    |   3 +
 .../predicate/TestAccumuloPredicateHandler.java |  11 +-
 .../predicate/TestAccumuloRangeGenerator.java   | 201 +++++++++--
 .../src/test/queries/positive/accumulo_index.q  |  44 +++
 .../test/results/positive/accumulo_index.q.out  | 180 ++++++++++
 25 files changed, 2277 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/pom.xml
----------------------------------------------------------------------
diff --git a/accumulo-handler/pom.xml b/accumulo-handler/pom.xml
index 7e51b35..edac1b1 100644
--- a/accumulo-handler/pom.xml
+++ b/accumulo-handler/pom.xml
@@ -62,6 +62,16 @@
       <groupId>org.apache.hive</groupId>
       <artifactId>hive-common</artifactId>
       <version>${project.version}</version>
+        <exclusions>
+            <exclusion>
+                <groupId>org.eclipse.jetty.aggregate</groupId>
+                <artifactId>jetty-all</artifactId>
+            </exclusion>
+            <exclusion>
+                <groupId>org.eclipse.jetty.orbit</groupId>
+                <artifactId>javax.servlet</artifactId>
+            </exclusion>
+        </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hive</groupId>
@@ -77,6 +87,16 @@
       <groupId>org.apache.hive</groupId>
       <artifactId>hive-service</artifactId>
       <version>${project.version}</version>
+        <exclusions>
+            <exclusion>
+                <groupId>org.eclipse.jetty.aggregate</groupId>
+                <artifactId>jetty-all</artifactId>
+            </exclusion>
+            <exclusion>
+                <groupId>org.eclipse.jetty.orbit</groupId>
+                <artifactId>javax.servlet</artifactId>
+            </exclusion>
+        </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hive</groupId>

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloDefaultIndexScanner.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloDefaultIndexScanner.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloDefaultIndexScanner.java
new file mode 100644
index 0000000..427a6c7
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloDefaultIndexScanner.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloIndexParameters;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.util.Collections.EMPTY_SET;
+
+/**
+ * This default index scanner expects indexes to be in the same format as presto's
+ * accumulo index tables defined as:
+ * [rowid=field value] [cf=cfname_cqname] [cq=rowid] [visibility] [value=""]
+ * <p>
+ * This handler looks for the following hive serde properties:
+ * 'accumulo.indextable.name' = 'table_idx' (required - name of the corresponding index table)
+ * 'accumulo.indexed.columns' = 'name,age,phone' (optional - comma separated list of indexed
+ *                      hive columns if not defined or defined as '*' all columns are
+ *                      assumed to be indexed )
+ * 'accumulo.index.rows.max' = '20000' (optional - maximum number of match indexes to use
+ *                      before converting to a full table scan default=20000'
+ *                      Note: This setting controls the size of the in-memory list of rowids
+ *                      each search predicate. Using large values for this setting or having
+ *                      very large rowid values may require additional memory to prevent
+ *                      out of memory errors
+ * 'accumulo.index.scanner'  = 'org.apache.hadoop.hive.accumulo.AccumuloDefaultIndexScanner'
+ *                      (optional - name of the index scanner)
+ * <p>
+ * To implement your own index table scheme it should be as simple as sub-classing
+ * this class and overriding getIndexRowRanges() and optionally init() if you need more
+ * config settings
+ */
+public class AccumuloDefaultIndexScanner implements AccumuloIndexScanner {
+  private static final Logger LOG = LoggerFactory.getLogger(AccumuloDefaultIndexScanner.class);
+
+  private AccumuloConnectionParameters connectParams;
+  private AccumuloIndexParameters indexParams;
+  private int maxRowIds;
+  private Authorizations auths;
+  private String indexTable;
+  private Set<String> indexColumns = EMPTY_SET;
+  private Connector connect;
+  private Map<String, String> colMap;
+
+  /**
+   * Initialize object based on configuration.
+   *
+   * @param conf - Hive configuration
+   */
+  @Override
+  public void init(Configuration conf) {
+    connectParams = new AccumuloConnectionParameters(conf);
+    indexParams = new AccumuloIndexParameters(conf);
+    maxRowIds = indexParams.getMaxIndexRows();
+    auths = indexParams.getTableAuths();
+    indexTable = indexParams.getIndexTable();
+    indexColumns = indexParams.getIndexColumns();
+    colMap = createColumnMap(conf);
+
+  }
+
+  /**
+   * Get a list of rowid ranges by scanning a column index.
+   *
+   * @param column     - the hive column name
+   * @param indexRange - Key range to scan on the index table
+   * @return List of matching rowid ranges or null if too many matches found
+   * if index values are not found a newline range is added to list to
+   * short-circuit the query
+   */
+  @Override
+  public List<Range> getIndexRowRanges(String column, Range indexRange) {
+    List<Range> rowIds = new ArrayList<Range>();
+    Scanner scan = null;
+    String col = this.colMap.get(column);
+
+    if (col != null) {
+
+      try {
+        LOG.debug("Searching tab=" + indexTable + " column=" + column + " range=" + indexRange);
+        Connector conn = getConnector();
+        scan = conn.createScanner(indexTable, auths);
+        scan.setRange(indexRange);
+        Text cf = new Text(col);
+        LOG.debug("Using Column Family=" + toString());
+        scan.fetchColumnFamily(cf);
+
+        for (Map.Entry<Key, Value> entry : scan) {
+
+          rowIds.add(new Range(entry.getKey().getColumnQualifier()));
+
+          // if we have too many results return null for a full scan
+          if (rowIds.size() > maxRowIds) {
+            return null;
+          }
+        }
+
+        // no hits on the index so return a no match range
+        if (rowIds.isEmpty()) {
+          LOG.debug("Found 0 index matches");
+        } else {
+          LOG.debug("Found " + rowIds.size() + " index matches");
+        }
+
+        return rowIds;
+      } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
+        LOG.error("Failed to scan index table: " + indexTable, e);
+      } finally {
+        if (scan != null) {
+          scan.close();
+        }
+      }
+    }
+
+    // assume the index is bad and do a full scan
+    LOG.debug("Index lookup failed for table " + indexTable);
+    return null;
+  }
+
+  /**
+   * Test if column is defined in the index table.
+   *
+   * @param column - hive column name
+   * @return true if the column is defined as part of the index table
+   */
+  @Override
+  public boolean isIndexed(String column) {
+    return indexTable != null
+        && (indexColumns.isEmpty() || indexColumns.contains("*")
+        || this.indexColumns.contains(column.toLowerCase())
+        || this.indexColumns.contains(column.toUpperCase()));
+
+  }
+
+  protected Map<String, String> createColumnMap(Configuration conf) {
+    Map<String, String> colsMap = new HashMap<String, String>();
+    String accColString = conf.get(AccumuloSerDeParameters.COLUMN_MAPPINGS);
+    if (accColString != null && !accColString.trim().isEmpty()) {
+      String[] accCols = accColString.split(",");
+      String[] hiveCols = conf.get(serdeConstants.LIST_COLUMNS).split(",");
+      for (int i = 0; i < accCols.length; i++) {
+        colsMap.put(hiveCols[i], accCols[i].replace(':', '_'));
+      }
+    }
+    return colsMap;
+  }
+
+  protected Connector getConnector() throws AccumuloSecurityException, AccumuloException {
+    if (connect == null) {
+      connect = connectParams.getConnector();
+    }
+    return connect;
+  }
+
+  public void setConnectParams(AccumuloConnectionParameters connectParams) {
+    this.connectParams = connectParams;
+  }
+
+  public AccumuloConnectionParameters getConnectParams() {
+    return connectParams;
+  }
+
+  public AccumuloIndexParameters getIndexParams() {
+    return indexParams;
+  }
+
+  public int getMaxRowIds() {
+    return maxRowIds;
+  }
+
+  public Authorizations getAuths() {
+    return auths;
+  }
+
+  public String getIndexTable() {
+    return indexTable;
+  }
+
+  public Set<String> getIndexColumns() {
+    return indexColumns;
+  }
+
+  public Connector getConnect() {
+    return connect;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexLexicoder.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexLexicoder.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexLexicoder.java
new file mode 100644
index 0000000..6703570
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexLexicoder.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo;
+
+import org.apache.accumulo.core.client.lexicoder.BigIntegerLexicoder;
+import org.apache.accumulo.core.client.lexicoder.DoubleLexicoder;
+import org.apache.accumulo.core.client.lexicoder.IntegerLexicoder;
+import org.apache.accumulo.core.client.lexicoder.LongLexicoder;
+import org.apache.hadoop.hive.serde.serdeConstants;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Utility class to encode index values for accumulo.
+ */
+public final class AccumuloIndexLexicoder {
+  private static final IntegerLexicoder INTEGER_LEXICODER = new IntegerLexicoder();
+  private static final DoubleLexicoder DOUBLE_LEXICODER = new DoubleLexicoder();
+  private static final LongLexicoder LONG_LEXICODER = new LongLexicoder();
+  private static final BigIntegerLexicoder BIG_INTEGER_LEXICODER = new BigIntegerLexicoder();
+  private static final String DIM_PAT = "[(]+.*";
+
+
+  private AccumuloIndexLexicoder() {
+    // hide constructor
+  }
+
+  public static String getRawType(String hiveType) {
+    if (hiveType != null) {
+      return hiveType.toLowerCase().replaceFirst(DIM_PAT, "").trim();
+    }
+    return hiveType;
+  }
+
+  public static byte[] encodeValue(byte[] value, String hiveType, boolean stringEncoded) {
+    if (stringEncoded) {
+      return encodeStringValue(value, hiveType);
+    } else {
+      return encodeBinaryValue(value, hiveType);
+    }
+  }
+
+  public static byte[] encodeStringValue(byte[] value, String hiveType) {
+    String rawType = getRawType(hiveType);
+
+    switch(rawType) {
+      case serdeConstants.BOOLEAN_TYPE_NAME:
+        return Boolean.valueOf(new String(value)).toString().getBytes(UTF_8);
+      case serdeConstants.SMALLINT_TYPE_NAME :
+      case serdeConstants.TINYINT_TYPE_NAME :
+      case serdeConstants.INT_TYPE_NAME :
+        return INTEGER_LEXICODER.encode(Integer.valueOf(new String(value)));
+      case serdeConstants.FLOAT_TYPE_NAME :
+      case serdeConstants.DOUBLE_TYPE_NAME :
+        return DOUBLE_LEXICODER.encode(Double.valueOf(new String(value)));
+      case serdeConstants.BIGINT_TYPE_NAME :
+        return LONG_LEXICODER.encode(Long.valueOf(new String(value)));
+      case serdeConstants.DECIMAL_TYPE_NAME :
+        return BIG_INTEGER_LEXICODER.encode(new BigInteger(new String(value), 10));
+      default :
+        // return the passed in string value
+        return value;
+    }
+  }
+
+  public static byte[] encodeBinaryValue(byte[] value, String hiveType) {
+    String rawType = getRawType(hiveType);
+
+    switch(rawType) {
+      case serdeConstants.BOOLEAN_TYPE_NAME :
+        return String.valueOf(value[0] == 1).getBytes();
+      case serdeConstants.INT_TYPE_NAME :
+        return INTEGER_LEXICODER.encode(ByteBuffer.wrap(value).asIntBuffer().get());
+      case serdeConstants.SMALLINT_TYPE_NAME :
+        return INTEGER_LEXICODER.encode((int)(ByteBuffer.wrap(value).asShortBuffer().get()));
+      case serdeConstants.TINYINT_TYPE_NAME :
+        return INTEGER_LEXICODER.encode((int)value[0]);
+      case serdeConstants.FLOAT_TYPE_NAME :
+        return DOUBLE_LEXICODER.encode((double)ByteBuffer.wrap(value).asFloatBuffer().get());
+      case serdeConstants.DOUBLE_TYPE_NAME :
+        return DOUBLE_LEXICODER.encode(ByteBuffer.wrap(value).asDoubleBuffer().get());
+      case serdeConstants.BIGINT_TYPE_NAME :
+        return LONG_LEXICODER.encode(ByteBuffer.wrap(value).asLongBuffer().get());
+      case serdeConstants.DECIMAL_TYPE_NAME :
+        return BIG_INTEGER_LEXICODER.encode(new BigInteger(value));
+      default :
+        return value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexScanner.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexScanner.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexScanner.java
new file mode 100644
index 0000000..8029f3c
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexScanner.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo;
+
+import org.apache.accumulo.core.data.Range;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.List;
+
+/**
+ * Specification for implementing a AccumuloIndexScanner.
+ */
+public interface AccumuloIndexScanner {
+
+  /**
+   * Initialize the index scanner implementation with the runtime configuration.
+   *
+   * @param conf  - the hadoop configuration
+   */
+  void init(Configuration conf);
+
+  /**
+   * Check if column is defined as being indexed.
+   *
+   * @param columnName - the hive column name
+   * @return true if the column is indexed
+   */
+  boolean isIndexed(String columnName);
+
+  /**
+   * Get a list of rowid ranges by scanning a column index.
+   *
+   * @param column     - the hive column name
+   * @param indexRange - Key range to scan on the index table
+   * @return List of matching rowid ranges or null if too many matches found
+   *
+   */
+  List<Range> getIndexRowRanges(String column, Range indexRange);
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexScannerException.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexScannerException.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexScannerException.java
new file mode 100644
index 0000000..c50b606
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexScannerException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo;
+
+/**
+ * Exception class for AccumuloIndexScanner operations.
+ */
+public class AccumuloIndexScannerException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+  public AccumuloIndexScannerException() {
+    super();
+  }
+
+  public AccumuloIndexScannerException(String msg) {
+    super(msg);
+  }
+
+  public AccumuloIndexScannerException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java
index cdbc7f2..62524e8 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java
@@ -1,10 +1,11 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -17,10 +18,6 @@
 
 package org.apache.hadoop.hive.accumulo;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.Properties;
-
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Connector;
@@ -39,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.accumulo.mr.HiveAccumuloTableInputFormat;
 import org.apache.hadoop.hive.accumulo.mr.HiveAccumuloTableOutputFormat;
 import org.apache.hadoop.hive.accumulo.predicate.AccumuloPredicateHandler;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloIndexParameters;
 import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDe;
 import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
@@ -52,13 +50,13 @@ import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
-import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.StringUtils;
@@ -66,12 +64,18 @@ import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
 /**
  * Create table mapping to Accumulo for Hive. Handle predicate pushdown if necessary.
  */
 public class AccumuloStorageHandler extends DefaultStorageHandler implements HiveMetaHook,
     HiveStoragePredicateHandler {
-  private static final Logger log = LoggerFactory.getLogger(AccumuloStorageHandler.class);
+  private static final Logger LOG = LoggerFactory.getLogger(AccumuloStorageHandler.class);
   private static final String DEFAULT_PREFIX = "default";
 
   protected AccumuloPredicateHandler predicateHandler = AccumuloPredicateHandler.getInstance();
@@ -88,7 +92,7 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
    *          Properties that will be added to the JobConf by Hive
    */
   @Override
-  public void configureTableJobProperties(TableDesc desc, Map<String,String> jobProps) {
+  public void configureTableJobProperties(TableDesc desc, Map<String, String> jobProps) {
     // Should not be getting invoked, configureInputJobProperties or configureOutputJobProperties
     // should be invoked instead.
     configureInputJobProperties(desc, jobProps);
@@ -119,6 +123,21 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
     }
   }
 
+  protected String getIndexTableName(Table table) {
+    // Use TBLPROPERTIES
+    String idxTableName = table.getParameters().get(AccumuloIndexParameters.INDEXTABLE_NAME);
+
+    if (null != idxTableName) {
+      return idxTableName;
+    }
+
+    // Then try SERDEPROPERTIES
+    idxTableName = table.getSd().getSerdeInfo().getParameters()
+        .get(AccumuloIndexParameters.INDEXTABLE_NAME);
+
+    return idxTableName;
+  }
+
   protected String getTableName(TableDesc tableDesc) {
     Properties props = tableDesc.getProperties();
     String tableName = props.getProperty(AccumuloSerDeParameters.TABLE_NAME);
@@ -135,6 +154,18 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
     return tableName;
   }
 
+  protected String getColumnTypes(TableDesc tableDesc)  {
+    Properties props = tableDesc.getProperties();
+    String columnTypes = props.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+    return columnTypes;
+  }
+
+  protected String getIndexTableName(TableDesc tableDesc) {
+    Properties props = tableDesc.getProperties();
+    String tableName = props.getProperty(AccumuloIndexParameters.INDEXTABLE_NAME);
+    return tableName;
+  }
+
   @Override
   public Configuration getConf() {
     return conf;
@@ -163,7 +194,7 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
   }
 
   @Override
-  public void configureInputJobProperties(TableDesc tableDesc, Map<String,String> jobProperties) {
+  public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
     Properties props = tableDesc.getProperties();
 
     jobProperties.put(AccumuloSerDeParameters.COLUMN_MAPPINGS,
@@ -178,7 +209,7 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
 
     String useIterators = props.getProperty(AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY);
     if (useIterators != null) {
-      if (!useIterators.equalsIgnoreCase("true") && !useIterators.equalsIgnoreCase("false")) {
+      if (!"true".equalsIgnoreCase(useIterators) && !"false".equalsIgnoreCase(useIterators)) {
         throw new IllegalArgumentException("Expected value of true or false for "
             + AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY);
       }
@@ -196,15 +227,15 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
       jobProperties.put(AccumuloSerDeParameters.AUTHORIZATIONS_KEY, authValue);
     }
 
-    log.info("Computed input job properties of " + jobProperties);
+    LOG.info("Computed input job properties of " + jobProperties);
   }
 
   @Override
-  public void configureOutputJobProperties(TableDesc tableDesc, Map<String,String> jobProperties) {
+  public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
     Properties props = tableDesc.getProperties();
     // Adding these job properties will make them available to the OutputFormat in checkOutputSpecs
-    jobProperties.put(AccumuloSerDeParameters.COLUMN_MAPPINGS,
-        props.getProperty(AccumuloSerDeParameters.COLUMN_MAPPINGS));
+    String colMap = props.getProperty(AccumuloSerDeParameters.COLUMN_MAPPINGS);
+    jobProperties.put(AccumuloSerDeParameters.COLUMN_MAPPINGS, colMap);
 
     String tableName = props.getProperty(AccumuloSerDeParameters.TABLE_NAME);
     if (null == tableName) {
@@ -212,6 +243,19 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
     }
     jobProperties.put(AccumuloSerDeParameters.TABLE_NAME, tableName);
 
+    String indexTable = props.getProperty(AccumuloIndexParameters.INDEXTABLE_NAME);
+    if (null == indexTable) {
+      indexTable = getIndexTableName(tableDesc);
+    }
+
+    if ( null != indexTable) {
+      jobProperties.put(AccumuloIndexParameters.INDEXTABLE_NAME, indexTable);
+
+      String indexColumns = props.getProperty(AccumuloIndexParameters.INDEXED_COLUMNS);
+      jobProperties.put(AccumuloIndexParameters.INDEXED_COLUMNS,
+          getIndexedColFamQuals(tableDesc, indexColumns, colMap));
+    }
+
     if (props.containsKey(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE)) {
       jobProperties.put(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE,
           props.getProperty(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE));
@@ -223,6 +267,42 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
     }
   }
 
+  private String getIndexedColFamQuals(TableDesc tableDesc, String indexColumns, String colMap) {
+    StringBuilder sb = new StringBuilder();
+
+    String cols = indexColumns;
+
+
+    String hiveColString = tableDesc.getProperties().getProperty(serdeConstants.LIST_COLUMNS);
+    // if there are actual accumulo index columns defined then build
+    // the comma separated list of accumulo columns
+    if (cols == null || cols.isEmpty() || "*".equals(indexColumns)) {
+      // skip rowid
+      cols = hiveColString.substring(hiveColString.indexOf(',')+1);
+    }
+
+    String[] hiveTypes = tableDesc.getProperties()
+        .getProperty(serdeConstants.LIST_COLUMN_TYPES).split(":");
+    String[] accCols = colMap.split(",");
+    String[] hiveCols = hiveColString.split(",");
+    Set<String> indexSet = new HashSet<String>();
+
+    for (String idx : cols.split(",")) {
+      indexSet.add(idx.trim());
+    }
+
+    for (int i = 0; i < hiveCols.length; i++) {
+      if (indexSet.contains(hiveCols[i].trim())) {
+        if (sb.length() > 0) {
+          sb.append(",");
+        }
+        sb.append(accCols[i].trim() + ":" + AccumuloIndexLexicoder.getRawType(hiveTypes[i]));
+      }
+    }
+
+    return sb.toString();
+  }
+
   @SuppressWarnings("rawtypes")
   @Override
   public Class<? extends InputFormat> getInputFormatClass() {
@@ -242,7 +322,7 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
       throw new MetaException("Location can't be specified for Accumulo");
     }
 
-    Map<String,String> serdeParams = table.getSd().getSerdeInfo().getParameters();
+    Map<String, String> serdeParams = table.getSd().getSerdeInfo().getParameters();
     String columnMapping = serdeParams.get(AccumuloSerDeParameters.COLUMN_MAPPINGS);
     if (columnMapping == null) {
       throw new MetaException(AccumuloSerDeParameters.COLUMN_MAPPINGS
@@ -268,6 +348,16 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
               + " already exists in Accumulo. Use CREATE EXTERNAL TABLE to register with Hive.");
         }
       }
+
+      String idxTable = getIndexTableName(table);
+
+      if (idxTable != null && !idxTable.isEmpty()) {
+
+        // create the index table if it does not exist
+        if (!tableOpts.exists(idxTable)) {
+          tableOpts.create(idxTable);
+        }
+      }
     } catch (AccumuloSecurityException e) {
       throw new MetaException(StringUtils.stringifyException(e));
     } catch (TableExistsException e) {
@@ -336,7 +426,7 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
     if (serDe.getIteratorPushdown()) {
       return predicateHandler.decompose(conf, desc);
     } else {
-      log.info("Set to ignore Accumulo iterator pushdown, skipping predicate handler.");
+      LOG.info("Set to ignore Accumulo iterator pushdown, skipping predicate handler.");
       return null;
     }
   }
@@ -348,22 +438,24 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
       Utils.addDependencyJars(jobConf, Tracer.class, Fate.class, Connector.class, Main.class,
           ZooKeeper.class, AccumuloStorageHandler.class);
     } catch (IOException e) {
-      log.error("Could not add necessary Accumulo dependencies to classpath", e);
+      LOG.error("Could not add necessary Accumulo dependencies to classpath", e);
     }
 
     Properties tblProperties = tableDesc.getProperties();
     AccumuloSerDeParameters serDeParams = null;
     try {
-      serDeParams = new AccumuloSerDeParameters(jobConf, tblProperties, AccumuloSerDe.class.getName());
+      serDeParams =
+          new AccumuloSerDeParameters(jobConf, tblProperties, AccumuloSerDe.class.getName());
     } catch (SerDeException e) {
-      log.error("Could not instantiate AccumuloSerDeParameters", e);
+      LOG.error("Could not instantiate AccumuloSerDeParameters", e);
       return;
     }
 
     try {
       serDeParams.getRowIdFactory().addDependencyJars(jobConf);
     } catch (IOException e) {
-      log.error("Could not add necessary dependencies for " + serDeParams.getRowIdFactory().getClass(), e);
+      LOG.error("Could not add necessary dependencies for "
+          + serDeParams.getRowIdFactory().getClass(), e);
     }
 
     // When Kerberos is enabled, we have to add the Accumulo delegation token to the
@@ -383,25 +475,26 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
               connectionParams.getAccumuloUserName(), token);
         } catch (IllegalStateException e) {
           // The implementation balks when this method is invoked multiple times
-          log.debug("Ignoring IllegalArgumentException about re-setting connector information");
+          LOG.debug("Ignoring IllegalArgumentException about re-setting connector information");
         }
         try {
           OutputConfigurator.setConnectorInfo(AccumuloOutputFormat.class, jobConf,
               connectionParams.getAccumuloUserName(), token);
         } catch (IllegalStateException e) {
           // The implementation balks when this method is invoked multiple times
-          log.debug("Ignoring IllegalArgumentException about re-setting connector information");
+          LOG.debug("Ignoring IllegalArgumentException about re-setting connector information");
         }
 
         // Convert the Accumulo token in a Hadoop token
         Token<? extends TokenIdentifier> accumuloToken = helper.getHadoopToken(token);
 
-        log.info("Adding Hadoop Token for Accumulo to Job's Credentials");
+        LOG.info("Adding Hadoop Token for Accumulo to Job's Credentials");
 
         // Add the Hadoop token to the JobConf
         helper.mergeTokenIntoJobConf(jobConf, accumuloToken);
       } catch (Exception e) {
-        throw new RuntimeException("Failed to obtain DelegationToken for " + connectionParams.getAccumuloUserName(), e);
+        throw new RuntimeException("Failed to obtain DelegationToken for "
+            + connectionParams.getAccumuloUserName(), e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexDefinition.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexDefinition.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexDefinition.java
new file mode 100644
index 0000000..51531d6
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexDefinition.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo.mr;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Index table definition.
+ */
+public class AccumuloIndexDefinition {
+  private final String baseTable;
+  private final String indexTable;
+  private final Map<String, String> colMap;
+
+
+  public AccumuloIndexDefinition(String baseTable, String indexTable) {
+    this.colMap = new HashMap<String, String>();
+    this.baseTable = baseTable;
+    this.indexTable = indexTable;
+  }
+
+  public String getBaseTable() {
+    return baseTable;
+  }
+
+
+  public String getIndexTable() {
+    return indexTable;
+  }
+
+  public void addIndexCol(String cf, String cq, String colType) {
+    colMap.put(encode(cf, cq), colType);
+  }
+
+  public Map<String, String> getColumnMap() {
+    return colMap;
+  }
+
+  public void setColumnTuples(String columns) {
+    if (columns != null) {
+      String cols = columns.trim();
+      if (!cols.isEmpty() && !"*".equals(cols)) {
+        for (String col : cols.split(",")) {
+          String[] cfcqtp = col.trim().split(":");
+          addIndexCol(cfcqtp[0], cfcqtp[1], cfcqtp[2]);
+        }
+      }
+    }
+  }
+
+  public boolean contains(String cf, String cq) {
+    return colMap.containsKey(encode(cf, cq));
+  }
+
+  public String getColType(String cf, String cq) {
+    return colMap.get(encode(cf, cq));
+  }
+
+  private String encode(String cf, String cq) {
+    return cq + ":" + cq;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexedOutputFormat.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexedOutputFormat.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexedOutputFormat.java
new file mode 100644
index 0000000..a055233
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexedOutputFormat.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo.mr;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.accumulo.AccumuloIndexLexicoder;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Extension of AccumuloOutputFormat to support indexing.
+ */
+public class AccumuloIndexedOutputFormat extends AccumuloOutputFormat {
+  private static final Logger LOG = Logger.getLogger(AccumuloIndexedOutputFormat.class);
+  private static final Class<?> CLASS = AccumuloOutputFormat.class;
+  private static final byte[] EMPTY_BYTES = new byte[0];
+
+  public static void setIndexTableName(JobConf job, String tableName) {
+    IndexOutputConfigurator.setIndexTableName(CLASS, job, tableName);
+  }
+
+  protected static String getIndexTableName(JobConf job) {
+    return IndexOutputConfigurator.getIndexTableName(CLASS, job);
+  }
+
+  public static void setIndexColumns(JobConf job, String fields) {
+    IndexOutputConfigurator.setIndexColumns(CLASS, job, fields);
+  }
+
+  protected static String getIndexColumns(JobConf job) {
+    return IndexOutputConfigurator.getIndexColumns(CLASS, job);
+  }
+
+  public static void setStringEncoding(JobConf job, Boolean isStringEncoding) {
+    IndexOutputConfigurator.setRecordEncoding(CLASS, job, isStringEncoding);
+  }
+
+  protected static Boolean getStringEncoding(JobConf job) {
+    return IndexOutputConfigurator.getRecordEncoding(CLASS, job);
+  }
+
+  public RecordWriter<Text, Mutation> getRecordWriter(FileSystem ignored, JobConf job,
+                                           String name, Progressable progress) throws IOException {
+    try {
+      return new AccumuloIndexedOutputFormat.AccumuloRecordWriter(job);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  protected static class AccumuloRecordWriter implements RecordWriter<Text, Mutation> {
+    private MultiTableBatchWriter mtbw = null;
+    private Map<Text, BatchWriter> bws = null;
+    private Text defaultTableName = null;
+    private Text indexTableName = null;
+    private boolean simulate = false;
+    private boolean createTables = false;
+    private boolean isStringEncoded = true;
+    private long mutCount = 0L;
+    private long valCount = 0L;
+    private Connector conn;
+    private AccumuloIndexDefinition indexDef = null;
+
+    protected AccumuloRecordWriter(JobConf job)
+        throws AccumuloException, AccumuloSecurityException, IOException {
+      Level l = AccumuloIndexedOutputFormat.getLogLevel(job);
+      if (l != null) {
+        LOG.setLevel(AccumuloIndexedOutputFormat.getLogLevel(job));
+      }
+      this.isStringEncoded = AccumuloIndexedOutputFormat.getStringEncoding(job).booleanValue();
+      this.simulate = AccumuloIndexedOutputFormat.getSimulationMode(job).booleanValue();
+      this.createTables = AccumuloIndexedOutputFormat.canCreateTables(job).booleanValue();
+      if (this.simulate) {
+        LOG.info("Simulating output only. No writes to tables will occur");
+      }
+
+      this.bws = new HashMap();
+      String tname = AccumuloIndexedOutputFormat.getDefaultTableName(job);
+      this.defaultTableName = tname == null ? null : new Text(tname);
+
+      String iname = AccumuloIndexedOutputFormat.getIndexTableName(job);
+      if (iname != null) {
+        LOG.info("Index Table = " + iname);
+        this.indexTableName = new Text(iname);
+        this.indexDef = createIndexDefinition(job, tname, iname);
+      }
+      if (!this.simulate) {
+        this.conn = AccumuloIndexedOutputFormat.getInstance(job)
+            .getConnector(AccumuloIndexedOutputFormat.getPrincipal(job),
+                          AccumuloIndexedOutputFormat.getAuthenticationToken(job));
+        this.mtbw = this.conn.createMultiTableBatchWriter(
+            AccumuloIndexedOutputFormat.getBatchWriterOptions(job));
+      }
+    }
+
+     AccumuloIndexDefinition createIndexDefinition(JobConf job, String tname, String iname) {
+      AccumuloIndexDefinition def = new AccumuloIndexDefinition(tname, iname);
+      String cols = AccumuloIndexedOutputFormat.getIndexColumns(job);
+      LOG.info("Index Cols = " + cols);
+      def.setColumnTuples(cols);
+      return def;
+    }
+
+    public void write(Text table, Mutation mutation) throws IOException {
+      if(table == null || table.toString().isEmpty()) {
+        table = this.defaultTableName;
+      }
+
+      if(!this.simulate && table == null) {
+        throw new IOException("No table or default table specified. Try simulation mode next time");
+      } else {
+        ++this.mutCount;
+        this.valCount += (long)mutation.size();
+        this.printMutation(table, mutation);
+        if(!this.simulate) {
+          if(!this.bws.containsKey(table)) {
+            try {
+              this.addTable(table);
+            } catch (Exception var5) {
+              LOG.error(var5);
+              throw new IOException(var5);
+            }
+          }
+          if(indexTableName != null && !this.bws.containsKey(indexTableName)) {
+            try {
+              this.addTable(indexTableName);
+            } catch (Exception var6) {
+              LOG.error(var6);
+              throw new IOException(var6);
+            }
+          }
+
+          try {
+            ((BatchWriter)this.bws.get(table)).addMutation(mutation);
+          } catch (MutationsRejectedException var4) {
+            throw new IOException(var4);
+          }
+
+          // if this table has an associated index table then attempt to build
+          // index mutations
+          if (indexTableName != null) {
+            List<Mutation> idxMuts = getIndexMutations(mutation);
+            if (!idxMuts.isEmpty()) {
+              try {
+                BatchWriter writer = this.bws.get(indexTableName);
+                for (Mutation m : idxMuts) {
+                  writer.addMutation(m);
+                }
+              } catch (MutationsRejectedException var4) {
+                throw new IOException(var4);
+              }
+            }
+          }
+        }
+      }
+    }
+
+    public void addTable(Text tableName) throws AccumuloException, AccumuloSecurityException {
+      if(this.simulate) {
+        LOG.info("Simulating adding table: " + tableName);
+      } else {
+        LOG.debug("Adding table: " + tableName);
+        BatchWriter bw = null;
+        String table = tableName.toString();
+        if(this.createTables && !this.conn.tableOperations().exists(table)) {
+          try {
+            this.conn.tableOperations().create(table);
+          } catch (AccumuloSecurityException var8) {
+            LOG.error("Accumulo security violation creating " + table, var8);
+            throw var8;
+          } catch (TableExistsException var9) {
+            LOG.warn("Table Exists " + table, var9);
+          }
+        }
+
+        try {
+          bw = this.mtbw.getBatchWriter(table);
+        } catch (TableNotFoundException var5) {
+          LOG.error("Accumulo table " + table + " doesn't exist and cannot be created.", var5);
+          throw new AccumuloException(var5);
+        }
+
+        if(bw != null) {
+          this.bws.put(tableName, bw);
+        }
+
+      }
+    }
+
+    private int printMutation(Text table, Mutation m) {
+      if(LOG.isTraceEnabled()) {
+        LOG.trace(String.format("Table %s row key: %s",
+            new Object[]{table, this.hexDump(m.getRow())}));
+        Iterator itr = m.getUpdates().iterator();
+
+        while(itr.hasNext()) {
+          ColumnUpdate cu = (ColumnUpdate)itr.next();
+          LOG.trace(String.format("Table %s column: %s:%s",
+              new Object[]{table, this.hexDump(cu.getColumnFamily()),
+                           this.hexDump(cu.getColumnQualifier())}));
+          LOG.trace(String.format("Table %s security: %s",
+              new Object[]{table, (new ColumnVisibility(cu.getColumnVisibility())).toString()}));
+          LOG.trace(String.format("Table %s value: %s",
+              new Object[]{table, this.hexDump(cu.getValue())}));
+        }
+      }
+
+      return m.getUpdates().size();
+    }
+
+    private List<Mutation> getIndexMutations(Mutation baseMut) {
+      List indexMuts = new ArrayList<Mutation>();
+
+      // nothing to do if there is not a index definition for this table
+      if (null != indexDef) {
+
+        byte[] rowId = baseMut.getRow();
+
+
+        for (ColumnUpdate cu : baseMut.getUpdates()) {
+          String cf = new String(cu.getColumnFamily());
+          String cq = new String(cu.getColumnQualifier());
+
+          // if this columnFamily/columnQualifier pair is defined in the index build a new mutation
+          // so key=value, cf=columnFamily_columnQualifer, cq=rowKey, cv=columnVisibility value=[]
+          String colType = indexDef.getColType(cf, cq);
+          if (colType != null) {
+            LOG.trace(String.format("Building index for column %s:%s", new Object[]{cf, cq}));
+            Mutation m = new Mutation(AccumuloIndexLexicoder.encodeValue(cu.getValue(), colType,
+                                               isStringEncoded));
+            String colFam = cf + "_" + cq;
+            m.put(colFam.getBytes(), rowId, new ColumnVisibility(cu.getColumnVisibility()),
+                  EMPTY_BYTES);
+            indexMuts.add(m);
+          }
+        }
+      }
+      return indexMuts;
+    }
+
+    private String hexDump(byte[] ba) {
+      StringBuilder sb = new StringBuilder();
+      byte[] arr = ba;
+      int len = ba.length;
+
+      for(int i = 0; i < len; ++i) {
+        byte b = arr[i];
+        if(b > 32 && b < 126) {
+          sb.append((char)b);
+        } else {
+          sb.append(String.format("x%02x", new Object[]{Byte.valueOf(b)}));
+        }
+      }
+
+      return sb.toString();
+    }
+
+    public void close(Reporter reporter) throws IOException {
+      LOG.debug("mutations written: " + this.mutCount + ", values written: " + this.valCount);
+      if(!this.simulate) {
+        try {
+          this.mtbw.close();
+        } catch (MutationsRejectedException var7) {
+          if(var7.getAuthorizationFailuresMap().size() >= 0) {
+            Map tables = new HashMap();
+
+            Map.Entry ke;
+            Object secCodes;
+            for(Iterator itr = var7.getAuthorizationFailuresMap().entrySet().iterator();
+                itr.hasNext(); ((Set)secCodes).addAll((Collection)ke.getValue())) {
+              ke = (Map.Entry)itr.next();
+              secCodes = (Set)tables.get(((KeyExtent)ke.getKey()).getTableId().toString());
+              if(secCodes == null) {
+                secCodes = new HashSet();
+                tables.put(((KeyExtent)ke.getKey()).getTableId().toString(), secCodes);
+              }
+            }
+
+            LOG.error("Not authorized to write to tables : " + tables);
+          }
+
+          if(var7.getConstraintViolationSummaries().size() > 0) {
+            LOG.error("Constraint violations : " + var7.getConstraintViolationSummaries().size());
+          }
+          throw new IOException(var7);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java
index 3ae5431..bfa764a 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java
@@ -1,10 +1,11 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -14,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.hive.accumulo.mr;
 
 import java.io.IOException;
@@ -27,8 +29,11 @@ import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.accumulo.AccumuloConnectionParameters;
+import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloIndexParameters;
 import org.apache.hadoop.hive.accumulo.HiveAccumuloHelper;
 import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordWriter;
@@ -42,7 +47,7 @@ import com.google.common.base.Preconditions;
 /**
  *
  */
-public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat {
+public class HiveAccumuloTableOutputFormat extends AccumuloIndexedOutputFormat {
 
   protected final HiveAccumuloHelper helper = new HiveAccumuloHelper();
 
@@ -54,7 +59,8 @@ public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat {
   }
 
   @Override
-  public RecordWriter<Text,Mutation> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException {
+  public RecordWriter<Text, Mutation> getRecordWriter(FileSystem ignored, JobConf job, String name,
+                                                     Progressable progress) throws IOException {
     configureAccumuloOutputFormat(job);
 
     return super.getRecordWriter(ignored, job, name, progress);
@@ -117,6 +123,16 @@ public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat {
 
       // Set the table where we're writing this data
       setDefaultAccumuloTableName(job, tableName);
+
+      // Set the index table information
+      final String indexTableName = job.get(AccumuloIndexParameters.INDEXTABLE_NAME);
+      final String indexedColumns = job.get(AccumuloIndexParameters.INDEXED_COLUMNS);
+      final String columnTypes = job.get(serdeConstants.LIST_COLUMN_TYPES);
+      final boolean binaryEncoding = ColumnEncoding.BINARY.getName()
+          .equalsIgnoreCase(job.get(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE));
+      setAccumuloIndexTableName(job, indexTableName);
+      setAccumuloIndexColumns(job, indexedColumns);
+      setAccumuloStringEncoding(job, !binaryEncoding);
     } catch (AccumuloSecurityException e) {
       log.error("Could not connect to Accumulo with provided credentials", e);
       throw new IOException(e);
@@ -125,10 +141,10 @@ public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat {
 
   // Non-static methods to wrap the static AccumuloOutputFormat methods to enable testing
 
-  protected void setConnectorInfoWithErrorChecking(JobConf conf, String username, AuthenticationToken token)
-      throws AccumuloSecurityException {
+  protected void setConnectorInfoWithErrorChecking(JobConf conf, String username,
+                                     AuthenticationToken token) throws AccumuloSecurityException {
     try {
-      AccumuloOutputFormat.setConnectorInfo(conf, username, token);
+      AccumuloIndexedOutputFormat.setConnectorInfo(conf, username, token);
     } catch (IllegalStateException e) {
       // AccumuloOutputFormat complains if you re-set an already set value. We just don't care.
       log.debug("Ignoring exception setting Accumulo Connector instance for user " + username, e);
@@ -136,8 +152,8 @@ public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat {
   }
 
   @SuppressWarnings("deprecation")
-  protected void setZooKeeperInstanceWithErrorChecking(JobConf conf, String instanceName, String zookeepers,
-      boolean isSasl) throws IOException {
+  protected void setZooKeeperInstanceWithErrorChecking(JobConf conf, String instanceName,
+                                           String zookeepers, boolean isSasl) throws IOException {
     try {
       if (isSasl) {
         // Reflection to support Accumulo 1.5. Remove when Accumulo 1.5 support is dropped
@@ -146,7 +162,7 @@ public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat {
         getHelper().setZooKeeperInstance(conf, AccumuloOutputFormat.class, zookeepers, instanceName,
             isSasl);
       } else {
-        AccumuloOutputFormat.setZooKeeperInstance(conf, instanceName, zookeepers);
+        AccumuloIndexedOutputFormat.setZooKeeperInstance(conf, instanceName, zookeepers);
       }
     } catch (IllegalStateException ise) {
       // AccumuloOutputFormat complains if you re-set an already set value. We just don't care.
@@ -157,7 +173,7 @@ public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat {
 
   protected void setMockInstanceWithErrorChecking(JobConf conf, String instanceName) {
     try {
-      AccumuloOutputFormat.setMockInstance(conf, instanceName);
+      AccumuloIndexedOutputFormat.setMockInstance(conf, instanceName);
     } catch (IllegalStateException e) {
       // AccumuloOutputFormat complains if you re-set an already set value. We just don't care.
       log.debug("Ignoring exception setting mock instance of " + instanceName, e);
@@ -165,7 +181,19 @@ public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat {
   }
 
   protected void setDefaultAccumuloTableName(JobConf conf, String tableName) {
-    AccumuloOutputFormat.setDefaultTableName(conf, tableName);
+    AccumuloIndexedOutputFormat.setDefaultTableName(conf, tableName);
+  }
+
+  protected void setAccumuloIndexTableName(JobConf conf, String indexTableName) {
+    AccumuloIndexedOutputFormat.setIndexTableName(conf, indexTableName);
+  }
+
+  protected void setAccumuloIndexColumns(JobConf conf, String indexColumns) {
+    AccumuloIndexedOutputFormat.setIndexColumns(conf, indexColumns);
+  }
+
+  protected void setAccumuloStringEncoding(JobConf conf, Boolean isStringEncoded) {
+    AccumuloIndexedOutputFormat.setStringEncoding(conf, isStringEncoded);
   }
 
   HiveAccumuloHelper getHelper() {

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/IndexOutputConfigurator.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/IndexOutputConfigurator.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/IndexOutputConfigurator.java
new file mode 100644
index 0000000..98294bb
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/IndexOutputConfigurator.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo.mr;
+
+import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Extension of OutputConfigurtion to support indexing.
+ */
+public class IndexOutputConfigurator extends OutputConfigurator {
+  /**
+   * Accumulo Write options.
+   */
+  public static enum WriteOpts {
+    DEFAULT_TABLE_NAME,
+    INDEX_TABLE_NAME,
+    INDEX_COLUMNS,
+    COLUMN_TYPES,
+    BINARY_ENCODING,
+    BATCH_WRITER_CONFIG;
+
+    private WriteOpts() {
+    }
+  }
+
+  public static void setIndexTableName(Class<?> implementingClass, Configuration conf,
+                                       String tableName) {
+    if(tableName != null) {
+      conf.set(enumToConfKey(implementingClass, WriteOpts.INDEX_TABLE_NAME), tableName);
+    }
+  }
+
+  public static String getIndexTableName(Class<?> implementingClass, Configuration conf) {
+    return conf.get(enumToConfKey(implementingClass, WriteOpts.INDEX_TABLE_NAME));
+  }
+
+  public static void setIndexColumns(Class<?> implementingClass, Configuration conf,
+                                     String tableName) {
+    if(tableName != null) {
+      conf.set(enumToConfKey(implementingClass, WriteOpts.INDEX_COLUMNS), tableName);
+    }
+  }
+
+  public static String getIndexColumns(Class<?> implementingClass, Configuration conf) {
+    return conf.get(enumToConfKey(implementingClass, WriteOpts.INDEX_COLUMNS));
+  }
+
+
+  public static void setRecordEncoding(Class<?> implementingClass, Configuration conf,
+                                       Boolean isBinary) {
+      conf.set(enumToConfKey(implementingClass, WriteOpts.BINARY_ENCODING), isBinary.toString());
+  }
+
+  public static Boolean getRecordEncoding(Class<?> implementingClass, Configuration conf) {
+    return Boolean.valueOf(conf.get(enumToConfKey(implementingClass, WriteOpts.BINARY_ENCODING)));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/package-info.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/package-info.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/package-info.java
new file mode 100644
index 0000000..599b1ea
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * map reduce and supporting classes
+ */
+package org.apache.hadoop.hive.accumulo.mr;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java
index a7ec7c5..718a5c5 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java
@@ -1,10 +1,11 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -29,6 +30,7 @@ import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.data.Range;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding;
 import org.apache.hadoop.hive.accumulo.columns.ColumnMapper;
 import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping;
 import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp;
@@ -86,13 +88,13 @@ public class AccumuloPredicateHandler {
   private static final List<Range> TOTAL_RANGE = Collections.singletonList(new Range());
 
   private static AccumuloPredicateHandler handler = new AccumuloPredicateHandler();
-  private static Map<String,Class<? extends CompareOp>> compareOps = Maps.newHashMap();
-  private static Map<String,Class<? extends PrimitiveComparison>> pComparisons = Maps.newHashMap();
+  private static Map<String, Class<? extends CompareOp>> compareOps = Maps.newHashMap();
+  private static Map<String, Class<? extends PrimitiveComparison>> pComparisons = Maps.newHashMap();
 
   // Want to start sufficiently "high" enough in the iterator stack
   private static int iteratorCount = 50;
 
-  private static final Logger log = LoggerFactory.getLogger(AccumuloPredicateHandler.class);
+  private static final Logger LOG = LoggerFactory.getLogger(AccumuloPredicateHandler.class);
   static {
     compareOps.put(GenericUDFOPEqual.class.getName(), Equal.class);
     compareOps.put(GenericUDFOPNotEqual.class.getName(), NotEqual.class);
@@ -136,8 +138,9 @@ public class AccumuloPredicateHandler {
    */
   public Class<? extends CompareOp> getCompareOpClass(String udfType)
       throws NoSuchCompareOpException {
-    if (!compareOps.containsKey(udfType))
+    if (!compareOps.containsKey(udfType)) {
       throw new NoSuchCompareOpException("Null compare op for specified key: " + udfType);
+    }
     return compareOps.get(udfType);
   }
 
@@ -167,9 +170,10 @@ public class AccumuloPredicateHandler {
    */
   public Class<? extends PrimitiveComparison> getPrimitiveComparisonClass(String type)
       throws NoSuchPrimitiveComparisonException {
-    if (!pComparisons.containsKey(type))
+    if (!pComparisons.containsKey(type)) {
       throw new NoSuchPrimitiveComparisonException("Null primitive comparison for specified key: "
           + type);
+    }
     return pComparisons.get(type);
   }
 
@@ -196,7 +200,8 @@ public class AccumuloPredicateHandler {
   /**
    * Loop through search conditions and build ranges for predicates involving rowID column, if any.
    */
-  public List<Range> getRanges(Configuration conf, ColumnMapper columnMapper) throws SerDeException {
+  public List<Range> getRanges(Configuration conf, ColumnMapper columnMapper)
+      throws SerDeException {
     if (!columnMapper.hasRowIdMapping()) {
       return TOTAL_RANGE;
     }
@@ -218,16 +223,16 @@ public class AccumuloPredicateHandler {
       return TOTAL_RANGE;
     }
 
-    Object result = generateRanges(columnMapper, hiveRowIdColumnName, root);
+    Object result = generateRanges(conf, columnMapper, hiveRowIdColumnName, root);
 
     if (null == result) {
-      log.info("Calculated null set of ranges, scanning full table");
+      LOG.info("Calculated null set of ranges, scanning full table");
       return TOTAL_RANGE;
     } else if (result instanceof Range) {
-      log.info("Computed a single Range for the query: " + result);
+      LOG.info("Computed a single Range for the query: " + result);
       return Collections.singletonList((Range) result);
     } else if (result instanceof List) {
-      log.info("Computed a collection of Ranges for the query: " + result);
+      LOG.info("Computed a collection of Ranges for the query: " + result);
       @SuppressWarnings("unchecked")
       List<Range> ranges = (List<Range>) result;
       return ranges;
@@ -237,9 +242,11 @@ public class AccumuloPredicateHandler {
   }
 
   /**
-   * Encapsulates the traversal over some {@link ExprNodeDesc} tree for the generation of Accumuluo
-   * Ranges using expressions involving the Accumulo rowid-mapped Hive column
+   * Encapsulates the traversal over some {@link ExprNodeDesc} tree for the generation of Accumuluo.
+   * Ranges using expressions involving the Accumulo rowid-mapped Hive column.
    *
+   * @param conf
+   *          Hadoop configuration
    * @param columnMapper
    *          Mapping of Hive to Accumulo columns for the query
    * @param hiveRowIdColumnName
@@ -249,15 +256,16 @@ public class AccumuloPredicateHandler {
    * @return An object representing the result from the ExprNodeDesc tree traversal using the
    *         AccumuloRangeGenerator
    */
-  protected Object generateRanges(ColumnMapper columnMapper, String hiveRowIdColumnName, ExprNodeDesc root) {
-    AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler,
+  protected Object generateRanges(Configuration conf, ColumnMapper columnMapper,
+                                  String hiveRowIdColumnName, ExprNodeDesc root) {
+    AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler,
         columnMapper.getRowIdMapping(), hiveRowIdColumnName);
     Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
-        Collections.<Rule,NodeProcessor> emptyMap(), null);
+        Collections.<Rule, NodeProcessor> emptyMap(), null);
     GraphWalker ogw = new DefaultGraphWalker(disp);
-    ArrayList<Node> roots = new ArrayList<Node>();
+    List<Node> roots = new ArrayList<Node>();
     roots.add(root);
-    HashMap<Node,Object> nodeOutput = new HashMap<Node,Object>();
+    HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
 
     try {
       ogw.startWalking(roots, nodeOutput);
@@ -282,10 +290,13 @@ public class AccumuloPredicateHandler {
     boolean shouldPushdown = conf.getBoolean(AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY,
         AccumuloSerDeParameters.ITERATOR_PUSHDOWN_DEFAULT);
     if (!shouldPushdown) {
-      log.info("Iterator pushdown is disabled for this table");
+      LOG.info("Iterator pushdown is disabled for this table");
       return itrs;
     }
 
+    boolean binaryEncodedRow = ColumnEncoding.BINARY.getName().
+        equalsIgnoreCase(conf.get(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE));
+
     int rowIdOffset = columnMapper.getRowIdOffset();
     String[] hiveColumnNamesArr = conf.getStrings(serdeConstants.LIST_COLUMNS);
 
@@ -306,11 +317,12 @@ public class AccumuloPredicateHandler {
       if (hiveRowIdColumnName == null || !hiveRowIdColumnName.equals(col)) {
         HiveAccumuloColumnMapping mapping = (HiveAccumuloColumnMapping) columnMapper
             .getColumnMappingForHiveColumn(hiveColumnNames, col);
-        itrs.add(toSetting(mapping, sc));
+        itrs.add(toSetting(mapping, sc, binaryEncodedRow));
       }
     }
-    if (log.isInfoEnabled())
-      log.info("num iterators = " + itrs.size());
+    if (LOG.isInfoEnabled()) {
+      LOG.info("num iterators = " + itrs.size());
+    }
     return itrs;
   }
 
@@ -322,15 +334,19 @@ public class AccumuloPredicateHandler {
    *          ColumnMapping to filter
    * @param sc
    *          IndexSearchCondition
+   * @param binaryEncodedValues
+   *          flag for binary encodedValues
    * @return IteratorSetting
    * @throws SerDeException
    */
   public IteratorSetting toSetting(HiveAccumuloColumnMapping accumuloColumnMapping,
-      IndexSearchCondition sc) throws SerDeException {
+      IndexSearchCondition sc, boolean binaryEncodedValues) throws SerDeException {
     iteratorCount++;
     final IteratorSetting is = new IteratorSetting(iteratorCount,
-        PrimitiveComparisonFilter.FILTER_PREFIX + iteratorCount, PrimitiveComparisonFilter.class);
-    final String type = sc.getColumnDesc().getTypeString();
+        PrimitiveComparisonFilter.FILTER_PREFIX + iteratorCount,
+        PrimitiveComparisonFilter.class);
+    final String type =  binaryEncodedValues ? sc.getColumnDesc().getTypeString()
+                                             : ColumnEncoding.STRING.getName();
     final String comparisonOpStr = sc.getComparisonOp();
 
     PushdownTuple tuple;
@@ -355,8 +371,9 @@ public class AccumuloPredicateHandler {
 
   public ExprNodeDesc getExpression(Configuration conf) {
     String filteredExprSerialized = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
-    if (filteredExprSerialized == null)
+    if (filteredExprSerialized == null) {
       return null;
+    }
 
     return SerializationUtilities.deserializeExpression(filteredExprSerialized);
   }
@@ -375,8 +392,9 @@ public class AccumuloPredicateHandler {
     }
     IndexPredicateAnalyzer analyzer = newAnalyzer(conf);
     ExprNodeDesc residual = analyzer.analyzePredicate(filterExpr, sConditions);
-    if (residual != null)
+    if (residual != null) {
       throw new RuntimeException("Unexpected residual predicate: " + residual.getExprString());
+    }
     return sConditions;
   }
 
@@ -394,8 +412,7 @@ public class AccumuloPredicateHandler {
     ExprNodeDesc residualPredicate = analyzer.analyzePredicate(desc, sConditions);
 
     if (sConditions.size() == 0) {
-      if (log.isInfoEnabled())
-        log.info("nothing to decompose. Returning");
+      LOG.info("nothing to decompose. Returning");
       return null;
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java
index 21392d1..afdc647 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java
@@ -1,10 +1,11 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -14,15 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hive.accumulo.predicate;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Stack;
+package org.apache.hadoop.hive.accumulo.predicate;
 
 import org.apache.accumulo.core.data.Range;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloIndexParameters;
+import org.apache.hadoop.hive.accumulo.AccumuloIndexScanner;
+import org.apache.hadoop.hive.accumulo.AccumuloIndexScannerException;
+import org.apache.hadoop.hive.accumulo.AccumuloIndexLexicoder;
 import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloRowIdColumnMapping;
 import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp;
 import org.apache.hadoop.hive.accumulo.predicate.compare.Equal;
@@ -43,17 +44,19 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBooleanObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantByteObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantFloatObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantLongObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantShortObjectInspector;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.UTF8;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 /**
  *
  */
@@ -63,12 +66,27 @@ public class AccumuloRangeGenerator implements NodeProcessor {
   private final AccumuloPredicateHandler predicateHandler;
   private final HiveAccumuloRowIdColumnMapping rowIdMapping;
   private final String hiveRowIdColumnName;
+  private AccumuloIndexScanner indexScanner;
 
-  public AccumuloRangeGenerator(AccumuloPredicateHandler predicateHandler,
+  public AccumuloRangeGenerator(Configuration conf, AccumuloPredicateHandler predicateHandler,
       HiveAccumuloRowIdColumnMapping rowIdMapping, String hiveRowIdColumnName) {
     this.predicateHandler = predicateHandler;
     this.rowIdMapping = rowIdMapping;
     this.hiveRowIdColumnName = hiveRowIdColumnName;
+    try {
+      this.indexScanner = new AccumuloIndexParameters(conf).createScanner();
+    } catch (AccumuloIndexScannerException e) {
+      LOG.error(e.getLocalizedMessage(), e);
+      this.indexScanner = null;
+    }
+  }
+
+  public AccumuloIndexScanner getIndexScanner() {
+    return indexScanner;
+  }
+
+  public void setIndexScanner(AccumuloIndexScanner indexScanner) {
+    this.indexScanner = indexScanner;
   }
 
   @Override
@@ -234,13 +252,39 @@ public class AccumuloRangeGenerator implements NodeProcessor {
       return null;
     }
 
-    // Reject any clauses that are against a column that isn't the rowId mapping
+    ConstantObjectInspector objInspector = constantDesc.getWritableObjectInspector();
+
+    // Reject any clauses that are against a column that isn't the rowId mapping or indexed
     if (!this.hiveRowIdColumnName.equals(columnDesc.getColumn())) {
+      if (this.indexScanner != null && this.indexScanner.isIndexed(columnDesc.getColumn())) {
+        return getIndexedRowIds(genericUdf, leftHandNode, columnDesc.getColumn(), objInspector);
+      }
       return null;
     }
 
-    ConstantObjectInspector objInspector = constantDesc.getWritableObjectInspector();
+    Text constText = getConstantText(objInspector);
+
+    return getRange(genericUdf, leftHandNode, constText);
+  }
+
+  private Range getRange(GenericUDF genericUdf, ExprNodeDesc leftHandNode, Text constText) {
+    Class<? extends CompareOp> opClz;
+    try {
+      opClz = predicateHandler.getCompareOpClass(genericUdf.getUdfName());
+    } catch (NoSuchCompareOpException e) {
+      throw new IllegalArgumentException("Unhandled UDF class: " + genericUdf.getUdfName());
+    }
+
+    if (leftHandNode instanceof ExprNodeConstantDesc) {
+      return getConstantOpColumnRange(opClz, constText);
+    } else if (leftHandNode instanceof ExprNodeColumnDesc) {
+      return getColumnOpConstantRange(opClz, constText);
+    } else {
+      throw new IllegalStateException("Expected column or constant on LHS of expression");
+    }
+  }
 
+  private Text getConstantText(ConstantObjectInspector objInspector) throws SemanticException {
     Text constText;
     switch (rowIdMapping.getEncoding()) {
       case STRING:
@@ -257,21 +301,7 @@ public class AccumuloRangeGenerator implements NodeProcessor {
         throw new SemanticException("Unable to parse unknown encoding: "
             + rowIdMapping.getEncoding());
     }
-
-    Class<? extends CompareOp> opClz;
-    try {
-      opClz = predicateHandler.getCompareOpClass(genericUdf.getUdfName());
-    } catch (NoSuchCompareOpException e) {
-      throw new IllegalArgumentException("Unhandled UDF class: " + genericUdf.getUdfName());
-    }
-
-    if (leftHandNode instanceof ExprNodeConstantDesc) {
-      return getConstantOpColumnRange(opClz, constText);
-    } else if (leftHandNode instanceof ExprNodeColumnDesc) {
-      return getColumnOpConstantRange(opClz, constText);
-    } else {
-      throw new IllegalStateException("Expected column or constant on LHS of expression");
-    }
+    return constText;
   }
 
   protected Range getConstantOpColumnRange(Class<? extends CompareOp> opClz, Text constText) {
@@ -311,6 +341,21 @@ public class AccumuloRangeGenerator implements NodeProcessor {
     }
   }
 
+
+  protected Object getIndexedRowIds(GenericUDF genericUdf, ExprNodeDesc leftHandNode,
+                                    String columnName, ConstantObjectInspector objInspector)
+      throws SemanticException {
+    Text constText = getConstantText(objInspector);
+    byte[] value = constText.toString().getBytes(UTF_8);
+    byte[] encoded = AccumuloIndexLexicoder.encodeValue(value, objInspector.getTypeName(), true);
+    Range range = getRange(genericUdf, leftHandNode, new Text(encoded));
+    if (indexScanner != null) {
+      return indexScanner.getIndexRowRanges(columnName, range);
+    }
+    return null;
+  }
+
+
   protected Text getUtf8Value(ConstantObjectInspector objInspector) {
     // TODO is there a more correct way to get the literal value for the Object?
     return new Text(objInspector.getWritableConstantValue().toString());
@@ -327,7 +372,7 @@ public class AccumuloRangeGenerator implements NodeProcessor {
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     if (objInspector instanceof PrimitiveObjectInspector) {
       LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
-        (PrimitiveObjectInspector) objInspector);
+          (PrimitiveObjectInspector) objInspector);
     } else {
       return getUtf8Value(objInspector);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/169e6559/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java
index 17d5529..5121ea3 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -37,7 +37,6 @@ import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping;
 import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp;
 import org.apache.hadoop.hive.accumulo.predicate.compare.PrimitiveComparison;
 import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,7 +53,7 @@ import com.google.common.collect.Lists;
  */
 public class PrimitiveComparisonFilter extends WholeRowIterator {
   @SuppressWarnings("unused")
-  private static final Logger log = LoggerFactory.getLogger(PrimitiveComparisonFilter.class);
+  private static final Logger LOG = LoggerFactory.getLogger(PrimitiveComparisonFilter.class);
 
   public static final String FILTER_PREFIX = "accumulo.filter.compare.iterator.";
   public static final String P_COMPARE_CLASS = "accumulo.filter.iterator.p.compare.class";
@@ -68,7 +67,7 @@ public class PrimitiveComparisonFilter extends WholeRowIterator {
 
   @Override
   protected boolean filter(Text currentRow, List<Key> keys, List<Value> values) {
-    SortedMap<Key,Value> items;
+    SortedMap<Key, Value> items;
     boolean allow;
     try { // if key doesn't contain CF, it's an encoded value from a previous iterator.
       while (keys.get(0).getColumnFamily().getBytes().length == 0) {
@@ -103,11 +102,11 @@ public class PrimitiveComparisonFilter extends WholeRowIterator {
   }
 
   @Override
-  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+  public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options,
       IteratorEnvironment env) throws IOException {
     super.init(source, options, env);
     String serializedColumnMapping = options.get(COLUMN);
-    Entry<String,String> pair = ColumnMappingFactory.parseMapping(serializedColumnMapping);
+    Entry<String, String> pair = ColumnMappingFactory.parseMapping(serializedColumnMapping);
 
     // The ColumnEncoding, column name and type are all irrelevant at this point, just need the
     // cf:[cq]
@@ -135,7 +134,7 @@ public class PrimitiveComparisonFilter extends WholeRowIterator {
     }
   }
 
-  protected byte[] getConstant(Map<String,String> options) {
+  protected byte[] getConstant(Map<String, String> options) {
     String b64Const = options.get(CONST_VAL);
     return Base64.decodeBase64(b64Const.getBytes());
   }