You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2014/08/20 00:41:13 UTC
svn commit: r1619005 [2/9] - in /hive/trunk: ./ accumulo-handler/
accumulo-handler/src/ accumulo-handler/src/java/
accumulo-handler/src/java/org/ accumulo-handler/src/java/org/apache/
accumulo-handler/src/java/org/apache/hadoop/ accumulo-handler/src/ja...
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMappingFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMappingFactory.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMappingFactory.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMappingFactory.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.columns;
+
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+/**
+ *
+ */
+public class ColumnMappingFactory {
+ private static final Logger log = Logger.getLogger(ColumnMappingFactory.class);
+
+ /**
+ * Generate the proper instance of a ColumnMapping
+ *
+ * @param columnSpec
+ * Specification for mapping this column to Accumulo
+ * @param defaultEncoding
+ * The default encoding in which values should be encoded to Accumulo
+ */
+ public static ColumnMapping get(String columnSpec, ColumnEncoding defaultEncoding,
+ String columnName, TypeInfo columnType) {
+ Preconditions.checkNotNull(columnSpec);
+ Preconditions.checkNotNull(columnName);
+ Preconditions.checkNotNull(columnType);
+ ColumnEncoding encoding = defaultEncoding;
+
+ // Check for column encoding specification
+ if (ColumnEncoding.hasColumnEncoding(columnSpec)) {
+ String columnEncodingStr = ColumnEncoding.getColumnEncoding(columnSpec);
+ columnSpec = ColumnEncoding.stripCode(columnSpec);
+
+ if (AccumuloHiveConstants.ROWID.equalsIgnoreCase(columnSpec)) {
+ return new HiveAccumuloRowIdColumnMapping(columnSpec,
+ ColumnEncoding.get(columnEncodingStr), columnName, columnType.getTypeName());
+ } else {
+ Entry<String,String> pair = parseMapping(columnSpec);
+
+ if (isPrefix(pair.getValue())) {
+ // Sanity check that, for a map, we got 2 encodings
+ if (!ColumnEncoding.isMapEncoding(columnEncodingStr)) {
+ throw new IllegalArgumentException("Expected map encoding for a map specification, "
+ + columnSpec + " with encoding " + columnEncodingStr);
+ }
+
+ Entry<ColumnEncoding,ColumnEncoding> encodings = ColumnEncoding
+ .getMapEncoding(columnEncodingStr);
+
+ return new HiveAccumuloMapColumnMapping(pair.getKey(), pair.getValue(),
+ encodings.getKey(), encodings.getValue(), columnName, columnType.getTypeName());
+ } else {
+ return new HiveAccumuloColumnMapping(pair.getKey(), pair.getValue(),
+ ColumnEncoding.getFromMapping(columnEncodingStr), columnName, columnType.getTypeName());
+ }
+ }
+ } else {
+ if (AccumuloHiveConstants.ROWID.equalsIgnoreCase(columnSpec)) {
+ return new HiveAccumuloRowIdColumnMapping(columnSpec, defaultEncoding, columnName,
+ columnType.getTypeName());
+ } else {
+ Entry<String,String> pair = parseMapping(columnSpec);
+ boolean isPrefix = isPrefix(pair.getValue());
+
+ String cq = pair.getValue();
+
+ // Replace any \* that appear in the prefix with a regular *
+ if (-1 != cq.indexOf(AccumuloHiveConstants.ESCAPED_ASTERISK)) {
+ cq = cq.replaceAll(AccumuloHiveConstants.ESCAPED_ASERTISK_REGEX,
+ Character.toString(AccumuloHiveConstants.ASTERISK));
+ }
+
+ if (isPrefix) {
+ return new HiveAccumuloMapColumnMapping(pair.getKey(), cq.substring(0, cq.length() - 1),
+ defaultEncoding, defaultEncoding, columnName, columnType.getTypeName());
+ } else {
+ return new HiveAccumuloColumnMapping(pair.getKey(), cq, encoding, columnName, columnType.getTypeName());
+ }
+ }
+ }
+ }
+
+ public static ColumnMapping getMap(String columnSpec, ColumnEncoding keyEncoding,
+ ColumnEncoding valueEncoding, String columnName, TypeInfo columnType) {
+ Entry<String,String> pair = parseMapping(columnSpec);
+ return new HiveAccumuloMapColumnMapping(pair.getKey(), pair.getValue(), keyEncoding,
+ valueEncoding, columnName, columnType.toString());
+
+ }
+
+ public static boolean isPrefix(String maybePrefix) {
+ Preconditions.checkNotNull(maybePrefix);
+
+ if (AccumuloHiveConstants.ASTERISK == maybePrefix.charAt(maybePrefix.length() - 1)) {
+ if (maybePrefix.length() > 1) {
+ return AccumuloHiveConstants.ESCAPE != maybePrefix.charAt(maybePrefix.length() - 2);
+ } else {
+ return true;
+ }
+ }
+
+ // If we couldn't find an asterisk, it's not a prefix
+ return false;
+ }
+
+ /**
+ * Consumes the column mapping specification and breaks it into column family and column
+ * qualifier.
+ */
+ public static Entry<String,String> parseMapping(String columnSpec)
+ throws InvalidColumnMappingException {
+ int index = 0;
+ while (true) {
+ if (index >= columnSpec.length()) {
+ log.error("Cannot parse '" + columnSpec + "' as colon-separated column configuration");
+ throw new InvalidColumnMappingException(
+ "Columns must be provided as colon-separated family and qualifier pairs");
+ }
+
+ index = columnSpec.indexOf(AccumuloHiveConstants.COLON, index);
+
+ if (-1 == index) {
+ log.error("Cannot parse '" + columnSpec + "' as colon-separated column configuration");
+ throw new InvalidColumnMappingException(
+ "Columns must be provided as colon-separated family and qualifier pairs");
+ }
+
+ // Check for an escape character before the colon
+ if (index - 1 > 0) {
+ char testChar = columnSpec.charAt(index - 1);
+ if (AccumuloHiveConstants.ESCAPE == testChar) {
+ // this colon is escaped, search again after it
+ index++;
+ continue;
+ }
+
+ // If the previous character isn't an escape characters, it's the separator
+ }
+
+ // Can't be escaped, it is the separator
+ break;
+ }
+
+ String cf = columnSpec.substring(0, index), cq = columnSpec.substring(index + 1);
+
+ // Check for the escaped colon to remove before doing the expensive regex replace
+ if (-1 != cf.indexOf(AccumuloHiveConstants.ESCAPED_COLON)) {
+ cf = cf.replaceAll(AccumuloHiveConstants.ESCAPED_COLON_REGEX,
+ Character.toString(AccumuloHiveConstants.COLON));
+ }
+
+ // Check for the escaped colon to remove before doing the expensive regex replace
+ if (-1 != cq.indexOf(AccumuloHiveConstants.ESCAPED_COLON)) {
+ cq = cq.replaceAll(AccumuloHiveConstants.ESCAPED_COLON_REGEX,
+ Character.toString(AccumuloHiveConstants.COLON));
+ }
+
+ return Maps.immutableEntry(cf, cq);
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloColumnMapping.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloColumnMapping.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloColumnMapping.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloColumnMapping.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.columns;
+
+import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Charsets;
+
+/**
+ * A Hive column which maps to a column family and column qualifier pair in Accumulo
+ */
+public class HiveAccumuloColumnMapping extends ColumnMapping {
+ @SuppressWarnings("unused")
+ private static final Logger log = Logger.getLogger(HiveAccumuloColumnMapping.class);
+
+ protected String columnFamily, columnQualifier;
+ protected byte[] columnFamilyBytes, columnQualifierBytes;
+
+ public HiveAccumuloColumnMapping(String cf, String cq, ColumnEncoding encoding,
+ String columnName, String columnType) {
+ super(cf + AccumuloHiveConstants.COLON + cq, encoding, columnName, columnType);
+
+ columnFamily = cf;
+ columnQualifier = cq;
+ }
+
+ public String getColumnFamily() {
+ return this.columnFamily;
+ }
+
+ /**
+ * Cached bytes for the columnFamily. Modifications to the bytes will affect those stored in this
+ * ColumnMapping -- such modifications are highly recommended against.
+ *
+ * @return UTF8 formatted bytes
+ */
+ public byte[] getColumnFamilyBytes() {
+ if (null == columnFamilyBytes) {
+ columnFamilyBytes = columnFamily.getBytes(Charsets.UTF_8);
+ }
+
+ return columnFamilyBytes;
+ }
+
+ public String getColumnQualifier() {
+ return this.columnQualifier;
+ }
+
+ /**
+ * Cached bytes for the columnQualifier. Modifications to the bytes will affect those stored in
+ * this ColumnMapping -- such modifications are highly recommended against.
+ *
+ * @return UTF8 formatted bytes
+ */
+ public byte[] getColumnQualifierBytes() {
+ if (null == columnQualifierBytes) {
+ columnQualifierBytes = columnQualifier.getBytes(Charsets.UTF_8);
+ }
+
+ return columnQualifierBytes;
+ }
+
+ public String serialize() {
+ StringBuilder sb = new StringBuilder(16);
+ sb.append(columnFamily).append(AccumuloHiveConstants.COLON);
+ if (null != columnQualifier) {
+ sb.append(columnQualifier);
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public String toString() {
+ return "[" + this.getClass().getSimpleName() + ": " + columnFamily + ":" + columnQualifier
+ + ", encoding " + encoding + "]";
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloMapColumnMapping.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloMapColumnMapping.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloMapColumnMapping.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloMapColumnMapping.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.columns;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * ColumnMapping for combining Accumulo columns into a single Hive Map. Expects ColumnEncoding
+ * values for both the Key and Value of the Map.
+ */
+public class HiveAccumuloMapColumnMapping extends ColumnMapping {
+
+ protected final String columnFamily, columnQualifierPrefix;
+ protected final ColumnEncoding keyEncoding, valueEncoding;
+
+ /**
+ * @param columnFamily
+ * The column family that all qualifiers within should be placed into the same Hive map
+ * @param columnQualifierPrefix
+ * The column qualifier prefix to include in the map, null is treated as an empty prefix
+ * @param keyEncoding
+ * The encoding scheme for keys in this column family
+ * @param valueEncoding
+ * The encoding scheme for the Accumulo values
+ */
+ public HiveAccumuloMapColumnMapping(String columnFamily, String columnQualifierPrefix,
+ ColumnEncoding keyEncoding, ColumnEncoding valueEncoding, String columnName,
+ String columnType) {
+ // Try to make something reasonable to pass up to the base class
+ super((null == columnFamily ? "" : columnFamily) + AccumuloHiveConstants.COLON, valueEncoding,
+ columnName, columnType);
+
+ Preconditions.checkNotNull(columnFamily, "Must provide a column family");
+
+ this.columnFamily = columnFamily;
+ this.columnQualifierPrefix = (null == columnQualifierPrefix) ? "" : columnQualifierPrefix;
+ this.keyEncoding = keyEncoding;
+ this.valueEncoding = valueEncoding;
+ }
+
+ public String getColumnFamily() {
+ return columnFamily;
+ }
+
+ public String getColumnQualifierPrefix() {
+ return columnQualifierPrefix;
+ }
+
+ public ColumnEncoding getKeyEncoding() {
+ return keyEncoding;
+ }
+
+ public ColumnEncoding getValueEncoding() {
+ return valueEncoding;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof HiveAccumuloMapColumnMapping) {
+ HiveAccumuloMapColumnMapping other = (HiveAccumuloMapColumnMapping) o;
+ return columnFamily.equals(other.columnFamily)
+ && columnQualifierPrefix.equals(other.columnQualifierPrefix)
+ && keyEncoding.equals(other.keyEncoding) && valueEncoding.equals(other.valueEncoding);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ HashCodeBuilder hcb = new HashCodeBuilder(23, 31);
+ hcb.append(columnFamily).append(columnQualifierPrefix).append(keyEncoding)
+ .append(valueEncoding);
+ return hcb.toHashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "[" + this.getClass().getSimpleName() + ": " + columnFamily + ":"
+ + columnQualifierPrefix + "* encoding: " + keyEncoding + ":" + valueEncoding + "]";
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloRowIdColumnMapping.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloRowIdColumnMapping.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloRowIdColumnMapping.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloRowIdColumnMapping.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.columns;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * {@link ColumnMapping} which corresponds to the Hive column which should be used as the rowID in a
+ * {@link Mutation}
+ */
+public class HiveAccumuloRowIdColumnMapping extends ColumnMapping {
+
+ public HiveAccumuloRowIdColumnMapping(String columnSpec, ColumnEncoding encoding,
+ String columnName, String columnType) {
+ super(columnSpec, encoding, columnName, columnType);
+
+ // Ensure that we have the correct identifier as the column name
+ Preconditions.checkArgument(columnSpec.equalsIgnoreCase(AccumuloHiveConstants.ROWID));
+ }
+
+ @Override
+ public String toString() {
+ return "[" + this.getClass().getSimpleName() + ", " + this.mappingSpec + ", encoding "
+ + encoding + "]";
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveColumn.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveColumn.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveColumn.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveColumn.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.columns;
+
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *
+ */
+public class HiveColumn {
+
+ // The name of this column in the Hive schema
+ protected final String columnName;
+
+ // The Hive type of this column
+ protected final TypeInfo columnType;
+
+ public HiveColumn(String columnName, TypeInfo columnType) {
+ Preconditions.checkNotNull(columnName);
+ Preconditions.checkNotNull(columnType);
+
+ this.columnName = columnName;
+ this.columnType = columnType;
+ }
+
+ /**
+ * Get the name of the Hive column
+ */
+ public String getColumnName() {
+ return columnName;
+ }
+
+ /**
+ * The Hive type of this column
+ */
+ public TypeInfo getColumnType() {
+ return columnType;
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/InvalidColumnMappingException.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/InvalidColumnMappingException.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/InvalidColumnMappingException.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/InvalidColumnMappingException.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.columns;
+
+/**
+ *
+ */
+public class InvalidColumnMappingException extends IllegalArgumentException {
+
+ private static final long serialVersionUID = 1L;
+
+ public InvalidColumnMappingException() {
+ super();
+ }
+
+ public InvalidColumnMappingException(String msg) {
+ super(msg);
+ }
+
+ public InvalidColumnMappingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public InvalidColumnMappingException(Throwable cause) {
+ super(cause);
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloRecordReader.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloRecordReader.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloRecordReader.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.mr;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.hadoop.hive.accumulo.AccumuloHiveRow;
+import org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDe;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordReader;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Translate the {@link Key} {@link Value} pairs from {@link AccumuloInputFormat} to a
+ * {@link Writable} for consumption by the {@link AccumuloSerDe}.
+ */
+public class HiveAccumuloRecordReader implements RecordReader<Text,AccumuloHiveRow> {
+ private RecordReader<Text,PeekingIterator<Entry<Key,Value>>> recordReader;
+ private int iteratorCount;
+
+ public HiveAccumuloRecordReader(
+ RecordReader<Text,PeekingIterator<Entry<Key,Value>>> recordReader, int iteratorCount) {
+ this.recordReader = recordReader;
+ this.iteratorCount = iteratorCount;
+ }
+
+ @Override
+ public void close() throws IOException {
+ recordReader.close();
+ }
+
+ @Override
+ public Text createKey() {
+ return new Text();
+ }
+
+ @Override
+ public AccumuloHiveRow createValue() {
+ return new AccumuloHiveRow();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return recordReader.getProgress();
+ }
+
+ @Override
+ public boolean next(Text rowKey, AccumuloHiveRow row) throws IOException {
+ Text key = recordReader.createKey();
+ PeekingIterator<Map.Entry<Key,Value>> iter = recordReader.createValue();
+ if (recordReader.next(key, iter)) {
+ row.clear();
+ row.setRowId(key.toString());
+ List<Key> keys = Lists.newArrayList();
+ List<Value> values = Lists.newArrayList();
+ while (iter.hasNext()) { // collect key/values for this row.
+ Map.Entry<Key,Value> kv = iter.next();
+ keys.add(kv.getKey());
+ values.add(kv.getValue());
+
+ }
+ if (iteratorCount == 0) { // no encoded values, we can push directly to row.
+ pushToValue(keys, values, row);
+ } else {
+ for (int i = 0; i < iteratorCount; i++) { // each iterator creates a level of encoding.
+ SortedMap<Key,Value> decoded = PrimitiveComparisonFilter.decodeRow(keys.get(0),
+ values.get(0));
+ keys = Lists.newArrayList(decoded.keySet());
+ values = Lists.newArrayList(decoded.values());
+ }
+ pushToValue(keys, values, row); // after decoding we can push to value.
+ }
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ // flatten key/value pairs into row object for use in Serde.
+ private void pushToValue(List<Key> keys, List<Value> values, AccumuloHiveRow row)
+ throws IOException {
+ Iterator<Key> kIter = keys.iterator();
+ Iterator<Value> vIter = values.iterator();
+ while (kIter.hasNext()) {
+ Key k = kIter.next();
+ Value v = vIter.next();
+ row.add(k.getColumnFamily().toString(), k.getColumnQualifier().toString(), v.get());
+ }
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloSplit.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloSplit.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloSplit.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloSplit.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo.mr;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.mapred.RangeInputSplit;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Wraps RangeInputSplit into a FileSplit so Hadoop won't complain when it tries to make its own
+ * Path.
+ *
+ * <p>
+ * If the {@link RangeInputSplit} is used directly, it will hit a branch of code in
+ * {@link HiveInputSplit} which generates an invalid Path. Wrap it ourselves so that it doesn't
+ * error
+ */
+public class HiveAccumuloSplit extends FileSplit implements InputSplit {
+ private static final Logger log = Logger.getLogger(HiveAccumuloSplit.class);
+
+ private RangeInputSplit split;
+
+ public HiveAccumuloSplit() {
+ super((Path) null, 0, 0, (String[]) null);
+ split = new RangeInputSplit();
+ }
+
+ public HiveAccumuloSplit(RangeInputSplit split, Path dummyPath) {
+ super(dummyPath, 0, 0, (String[]) null);
+ this.split = split;
+ }
+
+ public RangeInputSplit getSplit() {
+ return this.split;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ split.readFields(in);
+ }
+
+ @Override
+ public String toString() {
+ return "HiveAccumuloSplit: " + split;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ split.write(out);
+ }
+
+ @Override
+ public long getLength() {
+ int len = 0;
+ try {
+ return split.getLength();
+ } catch (IOException e) {
+ log.error("Error getting length for split: " + StringUtils.stringifyException(e));
+ }
+ return len;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ return split.getLocations();
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,485 @@
+package org.apache.hadoop.hive.accumulo.mr;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapred.AccumuloRowInputFormat;
+import org.apache.accumulo.core.client.mapred.RangeInputSplit;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.accumulo.AccumuloConnectionParameters;
+import org.apache.hadoop.hive.accumulo.AccumuloHiveRow;
+import org.apache.hadoop.hive.accumulo.columns.ColumnMapper;
+import org.apache.hadoop.hive.accumulo.columns.ColumnMapping;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloMapColumnMapping;
+import org.apache.hadoop.hive.accumulo.predicate.AccumuloPredicateHandler;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
+import org.apache.hadoop.hive.accumulo.serde.TooManyAccumuloColumnsException;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wraps older InputFormat for use with Hive.
+ *
+ * Configure input scan with proper ranges, iterators, and columns based on serde properties for
+ * Hive table.
+ */
+public class HiveAccumuloTableInputFormat implements
+ org.apache.hadoop.mapred.InputFormat<Text,AccumuloHiveRow> {
+ private static final Logger log = LoggerFactory.getLogger(HiveAccumuloTableInputFormat.class);
+
+ // Visible for testing
+ protected AccumuloRowInputFormat accumuloInputFormat = new AccumuloRowInputFormat();
+ protected AccumuloPredicateHandler predicateHandler = AccumuloPredicateHandler.getInstance();
+
+ @Override
+ public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
+ final AccumuloConnectionParameters accumuloParams = new AccumuloConnectionParameters(jobConf);
+ final Instance instance = accumuloParams.getInstance();
+ final ColumnMapper columnMapper;
+ try {
+ columnMapper = getColumnMapper(jobConf);
+ } catch (TooManyAccumuloColumnsException e) {
+ throw new IOException(e);
+ }
+
+ JobContext context = ShimLoader.getHadoopShims().newJobContext(Job.getInstance(jobConf));
+ Path[] tablePaths = FileInputFormat.getInputPaths(context);
+
+ try {
+ final Connector connector = accumuloParams.getConnector(instance);
+ final List<ColumnMapping> columnMappings = columnMapper.getColumnMappings();
+ final List<IteratorSetting> iterators = predicateHandler.getIterators(jobConf, columnMapper);
+ final Collection<Range> ranges = predicateHandler.getRanges(jobConf, columnMapper);
+
+ // Setting an empty collection of ranges will, unexpectedly, scan all data
+ // We don't want that.
+ if (null != ranges && ranges.isEmpty()) {
+ return new InputSplit[0];
+ }
+
+ // Set the relevant information in the Configuration for the AccumuloInputFormat
+ configure(jobConf, instance, connector, accumuloParams, columnMapper, iterators, ranges);
+
+ int numColumns = columnMappings.size();
+
+ List<Integer> readColIds = ColumnProjectionUtils.getReadColumnIDs(jobConf);
+
+ // Sanity check
+ if (numColumns < readColIds.size())
+ throw new IOException("Number of column mappings (" + numColumns + ")"
+ + " numbers less than the hive table columns. (" + readColIds.size() + ")");
+
+ // get splits from Accumulo
+ InputSplit[] splits = accumuloInputFormat.getSplits(jobConf, numSplits);
+
+ HiveAccumuloSplit[] hiveSplits = new HiveAccumuloSplit[splits.length];
+ for (int i = 0; i < splits.length; i++) {
+ RangeInputSplit ris = (RangeInputSplit) splits[i];
+ hiveSplits[i] = new HiveAccumuloSplit(ris, tablePaths[0]);
+ }
+
+ return hiveSplits;
+ } catch (AccumuloException e) {
+ log.error("Could not configure AccumuloInputFormat", e);
+ throw new IOException(StringUtils.stringifyException(e));
+ } catch (AccumuloSecurityException e) {
+ log.error("Could not configure AccumuloInputFormat", e);
+ throw new IOException(StringUtils.stringifyException(e));
+ } catch (SerDeException e) {
+ log.error("Could not configure AccumuloInputFormat", e);
+ throw new IOException(StringUtils.stringifyException(e));
+ }
+ }
+
+ /**
+ * Setup accumulo input format from conf properties. Delegates to final RecordReader from mapred
+ * package.
+ *
+ * @param inputSplit
+ * @param jobConf
+ * @param reporter
+ * @return RecordReader
+ * @throws IOException
+ */
+ @Override
+ public RecordReader<Text,AccumuloHiveRow> getRecordReader(InputSplit inputSplit,
+ final JobConf jobConf, final Reporter reporter) throws IOException {
+ final ColumnMapper columnMapper;
+ try {
+ columnMapper = getColumnMapper(jobConf);
+ } catch (TooManyAccumuloColumnsException e) {
+ throw new IOException(e);
+ }
+
+ try {
+ final List<IteratorSetting> iterators = predicateHandler.getIterators(jobConf, columnMapper);
+
+ HiveAccumuloSplit hiveSplit = (HiveAccumuloSplit) inputSplit;
+ RangeInputSplit rangeSplit = hiveSplit.getSplit();
+
+ log.info("Split: " + rangeSplit);
+
+ // The RangeInputSplit *should* have all of the necesary information contained in it
+ // which alleviates us from re-parsing our configuration from the AccumuloStorageHandler
+ // and re-setting it into the Configuration (like we did in getSplits(...)). Thus, it should
+ // be unnecessary to re-invoke configure(...)
+
+ // ACCUMULO-2962 Iterators weren't getting serialized into the InputSplit, but we can
+ // compensate because we still have that info.
+ // Should be fixed in Accumulo 1.5.2 and 1.6.1
+ if (null == rangeSplit.getIterators()
+ || (rangeSplit.getIterators().isEmpty() && !iterators.isEmpty())) {
+ log.debug("Re-setting iterators on InputSplit due to Accumulo bug.");
+ rangeSplit.setIterators(iterators);
+ }
+
+ // ACCUMULO-3015 Like the above, RangeInputSplit should have the table name
+ // but we want it to, so just re-set it if it's null.
+ if (null == getTableName(rangeSplit)) {
+ final AccumuloConnectionParameters accumuloParams = new AccumuloConnectionParameters(
+ jobConf);
+ log.debug("Re-setting table name on InputSplit due to Accumulo bug.");
+ setTableName(rangeSplit, accumuloParams.getAccumuloTableName());
+ }
+
+ final RecordReader<Text,PeekingIterator<Map.Entry<Key,Value>>> recordReader = accumuloInputFormat
+ .getRecordReader(rangeSplit, jobConf, reporter);
+
+ return new HiveAccumuloRecordReader(recordReader, iterators.size());
+ } catch (SerDeException e) {
+ throw new IOException(StringUtils.stringifyException(e));
+ }
+ }
+
+ protected ColumnMapper getColumnMapper(Configuration conf) throws IOException,
+ TooManyAccumuloColumnsException {
+ final String defaultStorageType = conf.get(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE);
+
+ String[] columnNamesArr = conf.getStrings(serdeConstants.LIST_COLUMNS);
+ if (null == columnNamesArr) {
+ throw new IOException(
+ "Hive column names must be provided to InputFormat in the Configuration");
+ }
+ List<String> columnNames = Arrays.asList(columnNamesArr);
+
+ String serializedTypes = conf.get(serdeConstants.LIST_COLUMN_TYPES);
+ if (null == serializedTypes) {
+ throw new IOException(
+ "Hive column types must be provided to InputFormat in the Configuration");
+ }
+ ArrayList<TypeInfo> columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(serializedTypes);
+
+ return new ColumnMapper(conf.get(AccumuloSerDeParameters.COLUMN_MAPPINGS), defaultStorageType,
+ columnNames, columnTypes);
+ }
+
+ /**
+ * Configure the underlying AccumuloInputFormat
+ *
+ * @param conf
+ * Job configuration
+ * @param instance
+ * Accumulo instance
+ * @param connector
+ * Accumulo connector
+ * @param accumuloParams
+ * Connection information to the Accumulo instance
+ * @param columnMapper
+ * Configuration of Hive to Accumulo columns
+ * @param iterators
+ * Any iterators to be configured server-side
+ * @param ranges
+ * Accumulo ranges on for the query
+ * @throws AccumuloSecurityException
+ * @throws AccumuloException
+ * @throws SerDeException
+ */
+ protected void configure(JobConf conf, Instance instance, Connector connector,
+ AccumuloConnectionParameters accumuloParams, ColumnMapper columnMapper,
+ List<IteratorSetting> iterators, Collection<Range> ranges) throws AccumuloSecurityException,
+ AccumuloException, SerDeException {
+
+ // Handle implementation of Instance and invoke appropriate InputFormat method
+ if (instance instanceof MockInstance) {
+ setMockInstance(conf, instance.getInstanceName());
+ } else {
+ setZooKeeperInstance(conf, instance.getInstanceName(), instance.getZooKeepers());
+ }
+
+ // Set the username/passwd for the Accumulo connection
+ setConnectorInfo(conf, accumuloParams.getAccumuloUserName(),
+ new PasswordToken(accumuloParams.getAccumuloPassword()));
+
+ // Read from the given Accumulo table
+ setInputTableName(conf, accumuloParams.getAccumuloTableName());
+
+ // Check Configuration for any user-provided Authorization definition
+ Authorizations auths = AccumuloSerDeParameters.getAuthorizationsFromConf(conf);
+
+ if (null == auths) {
+ // Default to all of user's authorizations when no configuration is provided
+ auths = connector.securityOperations().getUserAuthorizations(
+ accumuloParams.getAccumuloUserName());
+ }
+
+ // Implicitly handles users providing invalid authorizations
+ setScanAuthorizations(conf, auths);
+
+ // restrict with any filters found from WHERE predicates.
+ addIterators(conf, iterators);
+
+ // restrict with any ranges found from WHERE predicates.
+ // not setting ranges scans the entire table
+ if (null != ranges) {
+ log.info("Setting ranges: " + ranges);
+ setRanges(conf, ranges);
+ }
+
+ // Restrict the set of columns that we want to read from the Accumulo table
+ HashSet<Pair<Text,Text>> pairs = getPairCollection(columnMapper.getColumnMappings());
+ if (null != pairs && !pairs.isEmpty()) {
+ fetchColumns(conf, pairs);
+ }
+ }
+
+ // Wrap the static AccumuloInputFormat methods with methods that we can
+ // verify were correctly called via Mockito
+
+ protected void setMockInstance(JobConf conf, String instanceName) {
+ try {
+ AccumuloInputFormat.setMockInstance(conf, instanceName);
+ } catch (IllegalStateException e) {
+ // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
+ log.debug("Ignoring exception setting mock instance of " + instanceName, e);
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ protected void setZooKeeperInstance(JobConf conf, String instanceName, String zkHosts) {
+ // To support builds against 1.5, we can't use the new 1.6 setZooKeeperInstance which
+ // takes a ClientConfiguration class that only exists in 1.6
+ try {
+ AccumuloInputFormat.setZooKeeperInstance(conf, instanceName, zkHosts);
+ } catch (IllegalStateException ise) {
+ // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
+ log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at "
+ + zkHosts, ise);
+ }
+ }
+
+ protected void setConnectorInfo(JobConf conf, String user, AuthenticationToken token)
+ throws AccumuloSecurityException {
+ try {
+ AccumuloInputFormat.setConnectorInfo(conf, user, token);
+ } catch (IllegalStateException e) {
+ // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
+ log.debug("Ignoring exception setting Accumulo Connector instance for user " + user, e);
+ }
+ }
+
+ protected void setInputTableName(JobConf conf, String tableName) {
+ AccumuloInputFormat.setInputTableName(conf, tableName);
+ }
+
+ protected void setScanAuthorizations(JobConf conf, Authorizations auths) {
+ AccumuloInputFormat.setScanAuthorizations(conf, auths);
+ }
+
+ protected void addIterators(JobConf conf, List<IteratorSetting> iterators) {
+ for (IteratorSetting is : iterators) {
+ AccumuloInputFormat.addIterator(conf, is);
+ }
+ }
+
+ protected void setRanges(JobConf conf, Collection<Range> ranges) {
+ AccumuloInputFormat.setRanges(conf, ranges);
+ }
+
+ protected void fetchColumns(JobConf conf, Set<Pair<Text,Text>> cfCqPairs) {
+ AccumuloInputFormat.fetchColumns(conf, cfCqPairs);
+ }
+
+ /**
+ * Create col fam/qual pairs from pipe separated values, usually from config object. Ignores
+ * rowID.
+ *
+ * @param columnMappings
+ * The list of ColumnMappings for the given query
+ * @return a Set of Pairs of colfams and colquals
+ */
+ protected HashSet<Pair<Text,Text>> getPairCollection(List<ColumnMapping> columnMappings) {
+ final HashSet<Pair<Text,Text>> pairs = new HashSet<Pair<Text,Text>>();
+
+ for (ColumnMapping columnMapping : columnMappings) {
+ if (columnMapping instanceof HiveAccumuloColumnMapping) {
+ HiveAccumuloColumnMapping accumuloColumnMapping = (HiveAccumuloColumnMapping) columnMapping;
+
+ Text cf = new Text(accumuloColumnMapping.getColumnFamily());
+ Text cq = null;
+
+ // A null cq implies an empty column qualifier
+ if (null != accumuloColumnMapping.getColumnQualifier()) {
+ cq = new Text(accumuloColumnMapping.getColumnQualifier());
+ }
+
+ pairs.add(new Pair<Text,Text>(cf, cq));
+ } else if (columnMapping instanceof HiveAccumuloMapColumnMapping) {
+ HiveAccumuloMapColumnMapping mapMapping = (HiveAccumuloMapColumnMapping) columnMapping;
+
+ // Can't fetch prefix on colqual, must pull the entire qualifier
+ // TODO use an iterator to do the filter, server-side.
+ pairs.add(new Pair<Text,Text>(new Text(mapMapping.getColumnFamily()), null));
+ }
+ }
+
+ log.info("Computed columns to fetch (" + pairs + ") from " + columnMappings);
+
+ return pairs;
+ }
+
+ /**
+ * Reflection to work around Accumulo 1.5 and 1.6 incompatibilities. Throws an {@link IOException}
+ * for any reflection related exceptions
+ *
+ * @param split
+ * A RangeInputSplit
+ * @return The name of the table from the split
+ * @throws IOException
+ */
+ protected String getTableName(RangeInputSplit split) throws IOException {
+ // ACCUMULO-3017 shenanigans with method names changing without deprecation
+ Method getTableName = null;
+ try {
+ getTableName = RangeInputSplit.class.getMethod("getTableName");
+ } catch (SecurityException e) {
+ log.debug("Could not get getTableName method from RangeInputSplit", e);
+ } catch (NoSuchMethodException e) {
+ log.debug("Could not get getTableName method from RangeInputSplit", e);
+ }
+
+ if (null != getTableName) {
+ try {
+ return (String) getTableName.invoke(split);
+ } catch (IllegalArgumentException e) {
+ log.debug("Could not invoke getTableName method from RangeInputSplit", e);
+ } catch (IllegalAccessException e) {
+ log.debug("Could not invoke getTableName method from RangeInputSplit", e);
+ } catch (InvocationTargetException e) {
+ log.debug("Could not invoke getTableName method from RangeInputSplit", e);
+ }
+ }
+
+ Method getTable;
+ try {
+ getTable = RangeInputSplit.class.getMethod("getTable");
+ } catch (SecurityException e) {
+ throw new IOException("Could not get table name from RangeInputSplit", e);
+ } catch (NoSuchMethodException e) {
+ throw new IOException("Could not get table name from RangeInputSplit", e);
+ }
+
+ try {
+ return (String) getTable.invoke(split);
+ } catch (IllegalArgumentException e) {
+ throw new IOException("Could not get table name from RangeInputSplit", e);
+ } catch (IllegalAccessException e) {
+ throw new IOException("Could not get table name from RangeInputSplit", e);
+ } catch (InvocationTargetException e) {
+ throw new IOException("Could not get table name from RangeInputSplit", e);
+ }
+ }
+
+ /**
+ * Sets the table name on a RangeInputSplit, accounting for change in method name. Any reflection
+ * related exception is wrapped in an {@link IOException}
+ *
+ * @param split
+ * The RangeInputSplit to operate on
+ * @param tableName
+ * The name of the table to set
+ * @throws IOException
+ */
+ protected void setTableName(RangeInputSplit split, String tableName) throws IOException {
+ // ACCUMULO-3017 shenanigans with method names changing without deprecation
+ Method setTableName = null;
+ try {
+ setTableName = RangeInputSplit.class.getMethod("setTableName", String.class);
+ } catch (SecurityException e) {
+ log.debug("Could not get getTableName method from RangeInputSplit", e);
+ } catch (NoSuchMethodException e) {
+ log.debug("Could not get getTableName method from RangeInputSplit", e);
+ }
+
+ if (null != setTableName) {
+ try {
+ setTableName.invoke(split, tableName);
+ return;
+ } catch (IllegalArgumentException e) {
+ log.debug("Could not invoke getTableName method from RangeInputSplit", e);
+ } catch (IllegalAccessException e) {
+ log.debug("Could not invoke getTableName method from RangeInputSplit", e);
+ } catch (InvocationTargetException e) {
+ log.debug("Could not invoke getTableName method from RangeInputSplit", e);
+ }
+ }
+
+ Method setTable;
+ try {
+ setTable = RangeInputSplit.class.getMethod("setTable", String.class);
+ } catch (SecurityException e) {
+ throw new IOException("Could not set table name from RangeInputSplit", e);
+ } catch (NoSuchMethodException e) {
+ throw new IOException("Could not set table name from RangeInputSplit", e);
+ }
+
+ try {
+ setTable.invoke(split, tableName);
+ } catch (IllegalArgumentException e) {
+ throw new IOException("Could not set table name from RangeInputSplit", e);
+ } catch (IllegalAccessException e) {
+ throw new IOException("Could not set table name from RangeInputSplit", e);
+ } catch (InvocationTargetException e) {
+ throw new IOException("Could not set table name from RangeInputSplit", e);
+ }
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.mr;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.accumulo.AccumuloConnectionParameters;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
+import org.apache.hadoop.mapred.JobConf;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *
+ */
+public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat {
+
+ @Override
+ public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+ configureAccumuloOutputFormat(job);
+
+ super.checkOutputSpecs(ignored, job);
+ }
+
+ protected void configureAccumuloOutputFormat(JobConf job) throws IOException {
+ AccumuloConnectionParameters cnxnParams = new AccumuloConnectionParameters(job);
+
+ final String tableName = job.get(AccumuloSerDeParameters.TABLE_NAME);
+
+ // Make sure we actually go the table name
+ Preconditions.checkNotNull(tableName,
+ "Expected Accumulo table name to be provided in job configuration");
+
+ // Set the necessary Accumulo information
+ try {
+ // Username/passwd for Accumulo
+ setAccumuloConnectorInfo(job, cnxnParams.getAccumuloUserName(),
+ new PasswordToken(cnxnParams.getAccumuloPassword()));
+
+ if (cnxnParams.useMockInstance()) {
+ setAccumuloMockInstance(job, cnxnParams.getAccumuloInstanceName());
+ } else {
+ // Accumulo instance name with ZK quorum
+ setAccumuloZooKeeperInstance(job, cnxnParams.getAccumuloInstanceName(),
+ cnxnParams.getZooKeepers());
+ }
+
+ // Set the table where we're writing this data
+ setDefaultAccumuloTableName(job, tableName);
+ } catch (AccumuloSecurityException e) {
+ log.error("Could not connect to Accumulo with provided credentials", e);
+ throw new IOException(e);
+ }
+ }
+
+ // Non-static methods to wrap the static AccumuloOutputFormat methods to enable testing
+
+ protected void setAccumuloConnectorInfo(JobConf conf, String username, AuthenticationToken token)
+ throws AccumuloSecurityException {
+ AccumuloOutputFormat.setConnectorInfo(conf, username, token);
+ }
+
+ @SuppressWarnings("deprecation")
+ protected void setAccumuloZooKeeperInstance(JobConf conf, String instanceName, String zookeepers) {
+ AccumuloOutputFormat.setZooKeeperInstance(conf, instanceName, zookeepers);
+ }
+
+ protected void setAccumuloMockInstance(JobConf conf, String instanceName) {
+ AccumuloOutputFormat.setMockInstance(conf, instanceName);
+ }
+
+ protected void setDefaultAccumuloTableName(JobConf conf, String tableName) {
+ AccumuloOutputFormat.setDefaultTableName(conf, tableName);
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/package-info.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/package-info.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/package-info.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/package-info.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,4 @@
+/**
+ * Serde and InputFormat support for connecting Hive to Accumulo tables.
+ */
+package org.apache.hadoop.hive.accumulo;
\ No newline at end of file
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,408 @@
+package org.apache.hadoop.hive.accumulo.predicate;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.Range;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.accumulo.columns.ColumnMapper;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping;
+import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp;
+import org.apache.hadoop.hive.accumulo.predicate.compare.DoubleCompare;
+import org.apache.hadoop.hive.accumulo.predicate.compare.Equal;
+import org.apache.hadoop.hive.accumulo.predicate.compare.GreaterThan;
+import org.apache.hadoop.hive.accumulo.predicate.compare.GreaterThanOrEqual;
+import org.apache.hadoop.hive.accumulo.predicate.compare.IntCompare;
+import org.apache.hadoop.hive.accumulo.predicate.compare.LessThan;
+import org.apache.hadoop.hive.accumulo.predicate.compare.LessThanOrEqual;
+import org.apache.hadoop.hive.accumulo.predicate.compare.Like;
+import org.apache.hadoop.hive.accumulo.predicate.compare.LongCompare;
+import org.apache.hadoop.hive.accumulo.predicate.compare.NotEqual;
+import org.apache.hadoop.hive.accumulo.predicate.compare.PrimitiveComparison;
+import org.apache.hadoop.hive.accumulo.predicate.compare.StringCompare;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
+import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.udf.UDFLike;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ *
+ * Supporting operations dealing with Hive Predicate pushdown to iterators and ranges.
+ *
+ * See {@link PrimitiveComparisonFilter}
+ *
+ */
+public class AccumuloPredicateHandler {
+ private static final List<Range> TOTAL_RANGE = Collections.singletonList(new Range());
+
+ private static AccumuloPredicateHandler handler = new AccumuloPredicateHandler();
+ private static Map<String,Class<? extends CompareOp>> compareOps = Maps.newHashMap();
+ private static Map<String,Class<? extends PrimitiveComparison>> pComparisons = Maps.newHashMap();
+
+ // Want to start sufficiently "high" enough in the iterator stack
+ private static int iteratorCount = 50;
+
+ private static final Logger log = Logger.getLogger(AccumuloPredicateHandler.class);
+ static {
+ compareOps.put(GenericUDFOPEqual.class.getName(), Equal.class);
+ compareOps.put(GenericUDFOPNotEqual.class.getName(), NotEqual.class);
+ compareOps.put(GenericUDFOPGreaterThan.class.getName(), GreaterThan.class);
+ compareOps.put(GenericUDFOPEqualOrGreaterThan.class.getName(), GreaterThanOrEqual.class);
+ compareOps.put(GenericUDFOPEqualOrLessThan.class.getName(), LessThanOrEqual.class);
+ compareOps.put(GenericUDFOPLessThan.class.getName(), LessThan.class);
+ compareOps.put(UDFLike.class.getName(), Like.class);
+
+ pComparisons.put("bigint", LongCompare.class);
+ pComparisons.put("int", IntCompare.class);
+ pComparisons.put("double", DoubleCompare.class);
+ pComparisons.put("string", StringCompare.class);
+ }
+
+ public static AccumuloPredicateHandler getInstance() {
+ return handler;
+ }
+
+ /**
+ *
+ * @return set of all UDF class names with matching CompareOpt implementations.
+ */
+ public Set<String> cOpKeyset() {
+ return compareOps.keySet();
+ }
+
+ /**
+ *
+ * @return set of all hive data types with matching PrimitiveCompare implementations.
+ */
+ public Set<String> pComparisonKeyset() {
+ return pComparisons.keySet();
+ }
+
+ /**
+ *
+ * @param udfType
+ * GenericUDF classname to lookup matching CompareOpt
+ * @return Class<? extends CompareOpt/>
+ */
+ public Class<? extends CompareOp> getCompareOpClass(String udfType)
+ throws NoSuchCompareOpException {
+ if (!compareOps.containsKey(udfType))
+ throw new NoSuchCompareOpException("Null compare op for specified key: " + udfType);
+ return compareOps.get(udfType);
+ }
+
+ public CompareOp getCompareOp(String udfType, IndexSearchCondition sc)
+ throws NoSuchCompareOpException, SerDeException {
+ Class<? extends CompareOp> clz = getCompareOpClass(udfType);
+
+ try {
+ return clz.newInstance();
+ } catch (ClassCastException e) {
+ throw new SerDeException("Column type mismatch in WHERE clause "
+ + sc.getComparisonExpr().getExprString() + " found type "
+ + sc.getConstantDesc().getTypeString() + " instead of "
+ + sc.getColumnDesc().getTypeString());
+ } catch (IllegalAccessException e) {
+ throw new SerDeException("Could not instantiate class for WHERE clause", e);
+ } catch (InstantiationException e) {
+ throw new SerDeException("Could not instantiate class for WHERE clause", e);
+ }
+ }
+
+ /**
+ *
+ * @param type
+ * String hive column lookup matching PrimitiveCompare
+ * @return Class<? extends ></?>
+ */
+ public Class<? extends PrimitiveComparison> getPrimitiveComparisonClass(String type)
+ throws NoSuchPrimitiveComparisonException {
+ if (!pComparisons.containsKey(type))
+ throw new NoSuchPrimitiveComparisonException("Null primitive comparison for specified key: "
+ + type);
+ return pComparisons.get(type);
+ }
+
+ public PrimitiveComparison getPrimitiveComparison(String type, IndexSearchCondition sc)
+ throws NoSuchPrimitiveComparisonException, SerDeException {
+ Class<? extends PrimitiveComparison> clz = getPrimitiveComparisonClass(type);
+
+ try {
+ return clz.newInstance();
+ } catch (ClassCastException e) {
+ throw new SerDeException("Column type mismatch in WHERE clause "
+ + sc.getComparisonExpr().getExprString() + " found type "
+ + sc.getConstantDesc().getTypeString() + " instead of "
+ + sc.getColumnDesc().getTypeString());
+ } catch (IllegalAccessException e) {
+ throw new SerDeException("Could not instantiate class for WHERE clause", e);
+ } catch (InstantiationException e) {
+ throw new SerDeException("Could not instantiate class for WHERE clause", e);
+ }
+ }
+
+ private AccumuloPredicateHandler() {}
+
+ /**
+ * Loop through search conditions and build ranges for predicates involving rowID column, if any.
+ */
+ public List<Range> getRanges(Configuration conf, ColumnMapper columnMapper) throws SerDeException {
+ if (!columnMapper.hasRowIdMapping()) {
+ return TOTAL_RANGE;
+ }
+
+ int rowIdOffset = columnMapper.getRowIdOffset();
+ String[] hiveColumnNamesArr = conf.getStrings(serdeConstants.LIST_COLUMNS);
+
+ if (null == hiveColumnNamesArr) {
+ throw new IllegalArgumentException("Could not find Hive columns in configuration");
+ }
+
+ // Already verified that we should have the rowId mapping
+ String hiveRowIdColumnName = hiveColumnNamesArr[rowIdOffset];
+
+ ExprNodeDesc root = this.getExpression(conf);
+
+ // No expression, therefore scan the whole table
+ if (null == root) {
+ return TOTAL_RANGE;
+ }
+
+ Object result = generateRanges(columnMapper, hiveRowIdColumnName, root);
+
+ if (null == result) {
+ log.info("Calculated null set of ranges, scanning full table");
+ return TOTAL_RANGE;
+ } else if (result instanceof Range) {
+ log.info("Computed a single Range for the query: " + result);
+ return Collections.singletonList((Range) result);
+ } else if (result instanceof List) {
+ log.info("Computed a collection of Ranges for the query: " + result);
+ @SuppressWarnings("unchecked")
+ List<Range> ranges = (List<Range>) result;
+ return ranges;
+ } else {
+ throw new IllegalArgumentException("Unhandled return from Range generation: " + result);
+ }
+ }
+
+ /**
+ * Encapsulates the traversal over some {@link ExprNodeDesc} tree for the generation of Accumuluo
+ * Ranges using expressions involving the Accumulo rowid-mapped Hive column
+ *
+ * @param columnMapper
+ * Mapping of Hive to Accumulo columns for the query
+ * @param hiveRowIdColumnName
+ * Name of the hive column mapped to the Accumulo rowid
+ * @param root
+ * Root of some ExprNodeDesc tree to traverse, the WHERE clause
+ * @return An object representing the result from the ExprNodeDesc tree traversal using the
+ * AccumuloRangeGenerator
+ */
+ protected Object generateRanges(ColumnMapper columnMapper, String hiveRowIdColumnName, ExprNodeDesc root) {
+ AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler,
+ columnMapper.getRowIdMapping(), hiveRowIdColumnName);
+ Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
+ Collections.<Rule,NodeProcessor> emptyMap(), null);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+ ArrayList<Node> roots = new ArrayList<Node>();
+ roots.add(root);
+ HashMap<Node,Object> nodeOutput = new HashMap<Node,Object>();
+
+ try {
+ ogw.startWalking(roots, nodeOutput);
+ } catch (SemanticException ex) {
+ throw new RuntimeException(ex);
+ }
+
+ return nodeOutput.get(root);
+ }
+
+ /**
+ * Loop through search conditions and build iterator settings for predicates involving columns
+ * other than rowID, if any.
+ *
+ * @param conf
+ * Configuration
+ * @throws SerDeException
+ */
+ public List<IteratorSetting> getIterators(Configuration conf, ColumnMapper columnMapper)
+ throws SerDeException {
+ List<IteratorSetting> itrs = Lists.newArrayList();
+ boolean shouldPushdown = conf.getBoolean(AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY,
+ AccumuloSerDeParameters.ITERATOR_PUSHDOWN_DEFAULT);
+ if (!shouldPushdown) {
+ log.info("Iterator pushdown is disabled for this table");
+ return itrs;
+ }
+
+ int rowIdOffset = columnMapper.getRowIdOffset();
+ String[] hiveColumnNamesArr = conf.getStrings(serdeConstants.LIST_COLUMNS);
+
+ if (null == hiveColumnNamesArr) {
+ throw new IllegalArgumentException("Could not find Hive columns in configuration");
+ }
+
+ String hiveRowIdColumnName = null;
+
+ if (rowIdOffset >= 0 && rowIdOffset < hiveColumnNamesArr.length) {
+ hiveRowIdColumnName = hiveColumnNamesArr[rowIdOffset];
+ }
+
+ List<String> hiveColumnNames = Arrays.asList(hiveColumnNamesArr);
+
+ for (IndexSearchCondition sc : getSearchConditions(conf)) {
+ String col = sc.getColumnDesc().getColumn();
+ if (hiveRowIdColumnName == null || !hiveRowIdColumnName.equals(col)) {
+ HiveAccumuloColumnMapping mapping = (HiveAccumuloColumnMapping) columnMapper
+ .getColumnMappingForHiveColumn(hiveColumnNames, col);
+ itrs.add(toSetting(mapping, sc));
+ }
+ }
+ if (log.isInfoEnabled())
+ log.info("num iterators = " + itrs.size());
+ return itrs;
+ }
+
+ /**
+ * Create an IteratorSetting for the right qualifier, constant, CompareOpt, and PrimitiveCompare
+ * type.
+ *
+ * @param accumuloColumnMapping
+ * ColumnMapping to filter
+ * @param sc
+ * IndexSearchCondition
+ * @return IteratorSetting
+ * @throws SerDeException
+ */
+ public IteratorSetting toSetting(HiveAccumuloColumnMapping accumuloColumnMapping,
+ IndexSearchCondition sc) throws SerDeException {
+ iteratorCount++;
+ final IteratorSetting is = new IteratorSetting(iteratorCount,
+ PrimitiveComparisonFilter.FILTER_PREFIX + iteratorCount, PrimitiveComparisonFilter.class);
+ final String type = sc.getColumnDesc().getTypeString();
+ final String comparisonOpStr = sc.getComparisonOp();
+
+ PushdownTuple tuple;
+ try {
+ tuple = new PushdownTuple(sc, getPrimitiveComparison(type, sc), getCompareOp(comparisonOpStr,
+ sc));
+ } catch (NoSuchPrimitiveComparisonException e) {
+ throw new SerDeException("No configured PrimitiveComparison class for " + type, e);
+ } catch (NoSuchCompareOpException e) {
+ throw new SerDeException("No configured CompareOp class for " + comparisonOpStr, e);
+ }
+
+ is.addOption(PrimitiveComparisonFilter.P_COMPARE_CLASS, tuple.getpCompare().getClass()
+ .getName());
+ is.addOption(PrimitiveComparisonFilter.COMPARE_OPT_CLASS, tuple.getcOpt().getClass().getName());
+ is.addOption(PrimitiveComparisonFilter.CONST_VAL,
+ new String(Base64.encodeBase64(tuple.getConstVal())));
+ is.addOption(PrimitiveComparisonFilter.COLUMN, accumuloColumnMapping.serialize());
+
+ return is;
+ }
+
+ public ExprNodeDesc getExpression(Configuration conf) {
+ String filteredExprSerialized = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+ if (filteredExprSerialized == null)
+ return null;
+
+ return Utilities.deserializeExpression(filteredExprSerialized);
+ }
+
+ /**
+ *
+ * @param conf
+ * Configuration
+ * @return list of IndexSearchConditions from the filter expression.
+ */
+ public List<IndexSearchCondition> getSearchConditions(Configuration conf) {
+ final List<IndexSearchCondition> sConditions = Lists.newArrayList();
+ ExprNodeDesc filterExpr = getExpression(conf);
+ if (null == filterExpr) {
+ return sConditions;
+ }
+ IndexPredicateAnalyzer analyzer = newAnalyzer(conf);
+ ExprNodeDesc residual = analyzer.analyzePredicate(filterExpr, sConditions);
+ if (residual != null)
+ throw new RuntimeException("Unexpected residual predicate: " + residual.getExprString());
+ return sConditions;
+ }
+
+ /**
+ *
+ * @param conf
+ * Configuration
+ * @param desc
+ * predicate expression node.
+ * @return DecomposedPredicate containing translated search conditions the analyzer can support.
+ */
+ public DecomposedPredicate decompose(Configuration conf, ExprNodeDesc desc) {
+ IndexPredicateAnalyzer analyzer = newAnalyzer(conf);
+ List<IndexSearchCondition> sConditions = new ArrayList<IndexSearchCondition>();
+ ExprNodeDesc residualPredicate = analyzer.analyzePredicate(desc, sConditions);
+
+ if (sConditions.size() == 0) {
+ if (log.isInfoEnabled())
+ log.info("nothing to decompose. Returning");
+ return null;
+ }
+
+ DecomposedPredicate decomposedPredicate = new DecomposedPredicate();
+ decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(sConditions);
+ decomposedPredicate.residualPredicate = (ExprNodeGenericFuncDesc) residualPredicate;
+ return decomposedPredicate;
+ }
+
+ /**
+ * Build an analyzer that allows comparison opts from compareOpts map, and all columns from table
+ * definition.
+ */
+ private IndexPredicateAnalyzer newAnalyzer(Configuration conf) {
+ IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer();
+ analyzer.clearAllowedColumnNames();
+ for (String op : cOpKeyset()) {
+ analyzer.addComparisonOp(op);
+ }
+
+ String[] hiveColumnNames = conf.getStrings(serdeConstants.LIST_COLUMNS);
+ for (String col : hiveColumnNames) {
+ analyzer.allowColumnName(col);
+ }
+
+ return analyzer;
+ }
+}