You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2014/08/20 00:41:13 UTC
svn commit: r1619005 [1/9] - in /hive/trunk: ./ accumulo-handler/
accumulo-handler/src/ accumulo-handler/src/java/
accumulo-handler/src/java/org/ accumulo-handler/src/java/org/apache/
accumulo-handler/src/java/org/apache/hadoop/ accumulo-handler/src/ja...
Author: khorgath
Date: Tue Aug 19 22:41:10 2014
New Revision: 1619005
URL: http://svn.apache.org/r1619005
Log:
HIVE-7068 : Integrate AccumuloStorageHandler (Josh Elser, reviewed by Navis, Nick Dimiduk & Sushanth Sowmyan)
Added:
hive/trunk/accumulo-handler/
hive/trunk/accumulo-handler/pom.xml
hive/trunk/accumulo-handler/src/
hive/trunk/accumulo-handler/src/java/
hive/trunk/accumulo-handler/src/java/org/
hive/trunk/accumulo-handler/src/java/org/apache/
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloHiveConstants.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloHiveRow.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/LazyAccumuloMap.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/LazyAccumuloRow.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnEncoding.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMapper.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMapping.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMappingFactory.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloColumnMapping.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloMapColumnMapping.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloRowIdColumnMapping.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveColumn.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/InvalidColumnMappingException.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloRecordReader.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloSplit.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/package-info.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchCompareOpException.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchPrimitiveComparisonException.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PushdownTuple.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/CompareOp.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/DoubleCompare.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Equal.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThan.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThanOrEqual.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/IntCompare.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThan.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThanOrEqual.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Like.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LongCompare.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/NotEqual.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/PrimitiveComparison.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/StringCompare.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/package-info.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/package-info.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloCompositeRowId.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowIdFactory.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloRowSerializer.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDe.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDeParameters.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/CompositeAccumuloRowIdFactory.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/DefaultAccumuloRowIdFactory.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/TooManyAccumuloColumnsException.java
hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/TooManyHiveColumnsException.java
hive/trunk/accumulo-handler/src/test/
hive/trunk/accumulo-handler/src/test/org/
hive/trunk/accumulo-handler/src/test/org/apache/
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloConnectionParameters.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloHiveRow.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestLazyAccumuloMap.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestLazyAccumuloRow.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/TestColumnEncoding.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/TestColumnMapper.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/TestColumnMappingFactory.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/TestHiveAccumuloColumnMapping.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/TestHiveRowIdColumnMapping.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTypes.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloRangeGenerator.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestPrimitiveComparisonFilter.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/compare/
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/compare/TestDoubleCompare.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/compare/TestIntCompare.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/compare/TestLongComparison.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/compare/TestStringCompare.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/serde/
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/serde/DelimitedAccumuloRowIdFactory.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/serde/FirstCharAccumuloCompositeRowId.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/serde/TestAccumuloRowSerializer.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/serde/TestAccumuloSerDe.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/serde/TestAccumuloSerDeParameters.java
hive/trunk/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/serde/TestDefaultAccumuloRowIdFactory.java
hive/trunk/accumulo-handler/src/test/queries/
hive/trunk/accumulo-handler/src/test/queries/positive/
hive/trunk/accumulo-handler/src/test/queries/positive/accumulo_custom_key.q
hive/trunk/accumulo-handler/src/test/queries/positive/accumulo_custom_key2.q
hive/trunk/accumulo-handler/src/test/queries/positive/accumulo_joins.q
hive/trunk/accumulo-handler/src/test/queries/positive/accumulo_predicate_pushdown.q
hive/trunk/accumulo-handler/src/test/queries/positive/accumulo_queries.q
hive/trunk/accumulo-handler/src/test/queries/positive/accumulo_single_sourced_multi_insert.q
hive/trunk/accumulo-handler/src/test/results/
hive/trunk/accumulo-handler/src/test/results/positive/
hive/trunk/accumulo-handler/src/test/results/positive/accumulo_custom_key.q.out
hive/trunk/accumulo-handler/src/test/results/positive/accumulo_custom_key2.q.out
hive/trunk/accumulo-handler/src/test/results/positive/accumulo_joins.q.out
hive/trunk/accumulo-handler/src/test/results/positive/accumulo_predicate_pushdown.q.out
hive/trunk/accumulo-handler/src/test/results/positive/accumulo_queries.q.out
hive/trunk/accumulo-handler/src/test/results/positive/accumulo_single_sourced_multi_insert.q.out
hive/trunk/accumulo-handler/src/test/templates/
hive/trunk/accumulo-handler/src/test/templates/TestAccumuloCliDriver.vm
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/accumulo/
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/accumulo/AccumuloQTestUtil.java
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/accumulo/AccumuloTestSetup.java
Modified:
hive/trunk/itests/qtest/pom.xml
hive/trunk/itests/util/pom.xml
hive/trunk/packaging/pom.xml
hive/trunk/pom.xml
Added: hive/trunk/accumulo-handler/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/pom.xml?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/pom.xml (added)
+++ hive/trunk/accumulo-handler/pom.xml Tue Aug 19 22:41:10 2014
@@ -0,0 +1,158 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive</artifactId>
+ <version>0.14.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>hive-accumulo-handler</artifactId>
+ <packaging>jar</packaging>
+ <name>Hive Accumulo Handler</name>
+
+ <properties>
+ <hive.path.to.root>..</hive.path.to.root>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-fate</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-start</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-trace</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-metastore</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-serde</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-service</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-shims</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>hadoop-1</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>${hadoop-20S.version}</version>
+ <optional>true</optional>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>hadoop-2</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop-23.version}</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop-23.version}</version>
+ <optional>true</optional>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
+ <build>
+ <sourceDirectory>${basedir}/src/java</sourceDirectory>
+ <testSourceDirectory>${basedir}/src/test</testSourceDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *
+ */
+public class AccumuloConnectionParameters {
+ public static final String USER_NAME = "accumulo.user.name";
+ public static final String USER_PASS = "accumulo.user.pass";
+ public static final String ZOOKEEPERS = "accumulo.zookeepers";
+ public static final String INSTANCE_NAME = "accumulo.instance.name";
+ public static final String TABLE_NAME = "accumulo.table.name";
+
+ public static final String USE_MOCK_INSTANCE = "accumulo.mock.instance";
+
+ protected Configuration conf;
+ protected boolean useMockInstance = false;
+
+ public AccumuloConnectionParameters(Configuration conf) {
+ // TableDesc#getDeserializer will ultimately instantiate the AccumuloSerDe with a null
+ // Configuration
+ // We have to accept this and just fail late if data is attempted to be pulled from the
+ // Configuration
+ this.conf = conf;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public String getAccumuloUserName() {
+ Preconditions.checkNotNull(conf);
+ return conf.get(USER_NAME);
+ }
+
+ public String getAccumuloPassword() {
+ Preconditions.checkNotNull(conf);
+ return conf.get(USER_PASS);
+ }
+
+ public String getAccumuloInstanceName() {
+ Preconditions.checkNotNull(conf);
+ return conf.get(INSTANCE_NAME);
+ }
+
+ public String getZooKeepers() {
+ Preconditions.checkNotNull(conf);
+ return conf.get(ZOOKEEPERS);
+ }
+
+ public String getAccumuloTableName() {
+ Preconditions.checkNotNull(conf);
+ return conf.get(TABLE_NAME);
+ }
+
+ public boolean useMockInstance() {
+ Preconditions.checkNotNull(conf);
+ return conf.getBoolean(USE_MOCK_INSTANCE, false);
+ }
+
+ public Instance getInstance() {
+ String instanceName = getAccumuloInstanceName();
+
+ // Fail with a good message
+ if (null == instanceName) {
+ throw new IllegalArgumentException("Accumulo instance name must be provided in hiveconf using " + INSTANCE_NAME);
+ }
+
+ if (useMockInstance()) {
+ return new MockInstance(instanceName);
+ }
+
+ String zookeepers = getZooKeepers();
+
+ // Fail with a good message
+ if (null == zookeepers) {
+ throw new IllegalArgumentException("ZooKeeper quorum string must be provided in hiveconf using " + ZOOKEEPERS);
+ }
+
+ return new ZooKeeperInstance(instanceName, zookeepers);
+ }
+
+ public Connector getConnector() throws AccumuloException, AccumuloSecurityException {
+ Instance inst = getInstance();
+ return getConnector(inst);
+ }
+
+ public Connector getConnector(Instance inst) throws AccumuloException, AccumuloSecurityException {
+ String username = getAccumuloUserName(), password = getAccumuloPassword();
+
+ // Fail with a good message
+ if (null == username) {
+ throw new IllegalArgumentException("Accumulo user name must be provided in hiveconf using " + USER_NAME);
+ }
+ if (null == password) {
+ throw new IllegalArgumentException("Accumulo password must be provided in hiveconf using " + USER_PASS);
+ }
+
+ return inst.getConnector(username, new PasswordToken(password));
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloHiveConstants.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloHiveConstants.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloHiveConstants.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloHiveConstants.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo;
+
+import java.nio.charset.Charset;
+
+/**
+ *
+ */
+public class AccumuloHiveConstants {
+ public static final String ROWID = ":rowID";
+ public static final char COLON = ':', COMMA = ',', ESCAPE = '\\', POUND = '#', ASTERISK = '*';
+
+ public static final String ESCAPED_COLON = Character.toString(ESCAPE) + Character.toString(COLON);
+
+ // Escape the escape
+ public static final String ESCAPED_COLON_REGEX = Character.toString(ESCAPE)
+ + Character.toString(ESCAPE) + Character.toString(COLON);
+
+ public static final String ESCAPED_ASTERISK = Character.toString(ESCAPE)
+ + Character.toString(ASTERISK);
+
+ // Escape the escape, and escape the asterisk
+ public static final String ESCAPED_ASERTISK_REGEX = Character.toString(ESCAPE)
+ + Character.toString(ESCAPE) + Character.toString(ESCAPE) + Character.toString(ASTERISK);
+
+ public static final Charset UTF_8 = Charset.forName("UTF-8");
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloHiveRow.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloHiveRow.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloHiveRow.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloHiveRow.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,230 @@
+package org.apache.hadoop.hive.accumulo;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Holds column tuples for rowID. Each tuple contains column family label, qualifier label, and byte
+ * array value.
+ */
+public class AccumuloHiveRow implements Writable {
+
+ private String rowId;
+ private List<ColumnTuple> tuples = new ArrayList<ColumnTuple>();
+
+ public AccumuloHiveRow() {}
+
+ public AccumuloHiveRow(String rowId) {
+ this.rowId = rowId;
+ }
+
+ public void setRowId(String rowId) {
+ this.rowId = rowId;
+ }
+
+ public List<ColumnTuple> getTuples() {
+ return Collections.unmodifiableList(tuples);
+ }
+
+ /**
+ * @return true if this instance has a tuple containing fam and qual, false otherwise.
+ */
+ public boolean hasFamAndQual(Text fam, Text qual) {
+ for (ColumnTuple tuple : tuples) {
+ if (tuple.getCf().equals(fam) && tuple.getCq().equals(qual)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @return byte [] value for first tuple containing fam and qual or null if no match.
+ */
+ public byte[] getValue(Text fam, Text qual) {
+ for (ColumnTuple tuple : tuples) {
+ if (tuple.getCf().equals(fam) && tuple.getCq().equals(qual)) {
+ return tuple.getValue();
+ }
+ }
+ return null;
+ }
+
+ public String getRowId() {
+ return rowId;
+ }
+
+ public void clear() {
+ this.rowId = null;
+ this.tuples = new ArrayList<ColumnTuple>();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder("AccumuloHiveRow{");
+ builder.append("rowId='").append(rowId).append("', tuples: ");
+ for (ColumnTuple tuple : tuples) {
+ builder.append(tuple.toString());
+ builder.append("\n");
+ }
+ return builder.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof AccumuloHiveRow) {
+ AccumuloHiveRow other = (AccumuloHiveRow) o;
+ if (null == rowId) {
+ if (null != other.rowId) {
+ return false;
+ }
+ } else if (!rowId.equals(other.rowId)) {
+ return false;
+ }
+
+ return tuples.equals(other.tuples);
+ }
+
+ return false;
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ if (null != rowId) {
+ dataOutput.writeBoolean(true);
+ dataOutput.writeUTF(rowId);
+ } else {
+ dataOutput.writeBoolean(false);
+ }
+ int size = tuples.size();
+ dataOutput.writeInt(size);
+ for (ColumnTuple tuple : tuples) {
+ Text cf = tuple.getCf(), cq = tuple.getCq();
+ dataOutput.writeInt(cf.getLength());
+ dataOutput.write(cf.getBytes(), 0, cf.getLength());
+ dataOutput.writeInt(cq.getLength());
+ dataOutput.write(cq.getBytes(), 0, cq.getLength());
+ byte[] value = tuple.getValue();
+ dataOutput.writeInt(value.length);
+ dataOutput.write(value);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ if (dataInput.readBoolean()) {
+ rowId = dataInput.readUTF();
+ }
+ int size = dataInput.readInt();
+ for (int i = 0; i < size; i++) {
+ int cfLength = dataInput.readInt();
+ byte[] cfData = new byte[cfLength];
+ dataInput.readFully(cfData, 0, cfLength);
+ Text cf = new Text(cfData);
+ int cqLength = dataInput.readInt();
+ byte[] cqData = new byte[cqLength];
+ dataInput.readFully(cqData, 0, cqLength);
+ Text cq = new Text(cqData);
+ int valSize = dataInput.readInt();
+ byte[] val = new byte[valSize];
+ for (int j = 0; j < valSize; j++) {
+ val[j] = dataInput.readByte();
+ }
+ tuples.add(new ColumnTuple(cf, cq, val));
+ }
+ }
+
+ public void add(String cf, String qual, byte[] val) {
+ Preconditions.checkNotNull(cf);
+ Preconditions.checkNotNull(qual);
+ Preconditions.checkNotNull(val);
+
+ add(new Text(cf), new Text(qual), val);
+ }
+
+ public void add(Text cf, Text qual, byte[] val) {
+ Preconditions.checkNotNull(cf);
+ Preconditions.checkNotNull(qual);
+ Preconditions.checkNotNull(val);
+
+ tuples.add(new ColumnTuple(cf, qual, val));
+ }
+
+ public static class ColumnTuple {
+ private final Text cf;
+ private final Text cq;
+ private final byte[] value;
+
+ public ColumnTuple(Text cf, Text cq, byte[] value) {
+ this.value = value;
+ this.cf = cf;
+ this.cq = cq;
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+
+ public Text getCf() {
+ return cf;
+ }
+
+ public Text getCq() {
+ return cq;
+ }
+
+ @Override
+ public int hashCode() {
+ HashCodeBuilder hcb = new HashCodeBuilder(9683, 68783);
+ return hcb.append(cf).append(cq).append(value).toHashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof ColumnTuple) {
+ ColumnTuple other = (ColumnTuple) o;
+ if (null == cf) {
+ if (null != other.cf) {
+ return false;
+ }
+ } else if (!cf.equals(other.cf)) {
+ return false;
+ }
+
+ if (null == cq) {
+ if (null != other.cq) {
+ return false;
+ }
+ } else if (!cq.equals(other.cq)) {
+ return false;
+ }
+
+ if (null == value) {
+ if (null != other.value) {
+ return false;
+ }
+ }
+
+ return Arrays.equals(value, other.value);
+ }
+
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return cf + " " + cq + " " + new String(value);
+ }
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,341 @@
+package org.apache.hadoop.hive.accumulo;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.fate.Fate;
+import org.apache.accumulo.start.Main;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.accumulo.mr.HiveAccumuloTableInputFormat;
+import org.apache.hadoop.hive.accumulo.mr.HiveAccumuloTableOutputFormat;
+import org.apache.hadoop.hive.accumulo.predicate.AccumuloPredicateHandler;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDe;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create table mapping to Accumulo for Hive. Handle predicate pushdown if necessary.
+ */
+public class AccumuloStorageHandler extends DefaultStorageHandler implements HiveMetaHook,
+ HiveStoragePredicateHandler {
+ private static final Logger log = LoggerFactory.getLogger(AccumuloStorageHandler.class);
+ private static final String DEFAULT_PREFIX = "default";
+
+ protected AccumuloPredicateHandler predicateHandler = AccumuloPredicateHandler.getInstance();
+ protected AccumuloConnectionParameters connectionParams;
+ protected Configuration conf;
+
+ /**
+ * Push down table properties into the JobConf.
+ *
+ * @param desc
+ * Hive table description
+ * @param jobProps
+ * Properties that will be added to the JobConf by Hive
+ */
+ @Override
+ public void configureTableJobProperties(TableDesc desc, Map<String,String> jobProps) {
+ // Should not be getting invoked, configureInputJobProperties or configureOutputJobProperties
+ // should be invoked instead.
+ configureInputJobProperties(desc, jobProps);
+ configureOutputJobProperties(desc, jobProps);
+ }
+
+ protected String getTableName(Table table) throws MetaException {
+ // Use TBLPROPERTIES
+ String tableName = table.getParameters().get(AccumuloSerDeParameters.TABLE_NAME);
+
+ if (null != tableName) {
+ return tableName;
+ }
+
+ // Then try SERDEPROPERTIES
+ tableName = table.getSd().getSerdeInfo().getParameters()
+ .get(AccumuloSerDeParameters.TABLE_NAME);
+
+ if (null != tableName) {
+ return tableName;
+ }
+
+ // Use the hive table name, ignoring the default database
+ if (DEFAULT_PREFIX.equals(table.getDbName())) {
+ return table.getTableName();
+ } else {
+ return table.getDbName() + "." + table.getTableName();
+ }
+ }
+
+ protected String getTableName(TableDesc tableDesc) {
+ Properties props = tableDesc.getProperties();
+ String tableName = props.getProperty(AccumuloSerDeParameters.TABLE_NAME);
+ if (null != tableName) {
+ return tableName;
+ }
+
+ tableName = props.getProperty(hive_metastoreConstants.META_TABLE_NAME);
+
+ if (tableName.startsWith(DEFAULT_PREFIX + ".")) {
+ return tableName.substring(DEFAULT_PREFIX.length() + 1);
+ }
+
+ return tableName;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ connectionParams = new AccumuloConnectionParameters(conf);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public Class<? extends SerDe> getSerDeClass() {
+ return AccumuloSerDe.class;
+ }
+
+ @Override
+ public HiveMetaHook getMetaHook() {
+ return this;
+ }
+
+ @Override
+ public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException {
+ return null;
+ }
+
+ @Override
+ public void configureInputJobProperties(TableDesc tableDesc, Map<String,String> jobProperties) {
+ Properties props = tableDesc.getProperties();
+
+ jobProperties.put(AccumuloSerDeParameters.COLUMN_MAPPINGS,
+ props.getProperty(AccumuloSerDeParameters.COLUMN_MAPPINGS));
+
+ String tableName = props.getProperty(AccumuloSerDeParameters.TABLE_NAME);
+ if (null == tableName) {
+ tableName = getTableName(tableDesc);
+ }
+ jobProperties.put(AccumuloSerDeParameters.TABLE_NAME,
+ tableName);
+
+ String useIterators = props.getProperty(AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY);
+ if (useIterators != null) {
+ if (!useIterators.equalsIgnoreCase("true") && !useIterators.equalsIgnoreCase("false")) {
+ throw new IllegalArgumentException("Expected value of true or false for "
+ + AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY);
+ }
+
+ jobProperties.put(AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY, useIterators);
+ }
+
+ String storageType = props.getProperty(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE);
+ if (null != storageType) {
+ jobProperties.put(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE, storageType);
+ }
+
+ String authValue = props.getProperty(AccumuloSerDeParameters.AUTHORIZATIONS_KEY);
+ if (null != authValue) {
+ jobProperties.put(AccumuloSerDeParameters.AUTHORIZATIONS_KEY, authValue);
+ }
+
+ log.info("Computed input job properties of " + jobProperties);
+ }
+
+ @Override
+ public void configureOutputJobProperties(TableDesc tableDesc, Map<String,String> jobProperties) {
+ Properties props = tableDesc.getProperties();
+ // Adding these job properties will make them available to the OutputFormat in checkOutputSpecs
+ jobProperties.put(AccumuloSerDeParameters.COLUMN_MAPPINGS,
+ props.getProperty(AccumuloSerDeParameters.COLUMN_MAPPINGS));
+
+ String tableName = props.getProperty(AccumuloSerDeParameters.TABLE_NAME);
+ if (null == tableName) {
+ tableName = getTableName(tableDesc);
+ }
+ jobProperties.put(AccumuloSerDeParameters.TABLE_NAME, tableName);
+
+ if (props.containsKey(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE)) {
+ jobProperties.put(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE,
+ props.getProperty(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE));
+ }
+
+ if (props.containsKey(AccumuloSerDeParameters.VISIBILITY_LABEL_KEY)) {
+ jobProperties.put(AccumuloSerDeParameters.VISIBILITY_LABEL_KEY,
+ props.getProperty(AccumuloSerDeParameters.VISIBILITY_LABEL_KEY));
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Class<? extends InputFormat> getInputFormatClass() {
+ return HiveAccumuloTableInputFormat.class;
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public Class<? extends OutputFormat> getOutputFormatClass() {
+ return HiveAccumuloTableOutputFormat.class;
+ }
+
+ @Override
+ public void preCreateTable(Table table) throws MetaException {
+ boolean isExternal = isExternalTable(table);
+ if (table.getSd().getLocation() != null) {
+ throw new MetaException("Location can't be specified for Accumulo");
+ }
+
+ Map<String,String> serdeParams = table.getSd().getSerdeInfo().getParameters();
+ String columnMapping = serdeParams.get(AccumuloSerDeParameters.COLUMN_MAPPINGS);
+ if (columnMapping == null) {
+ throw new MetaException(AccumuloSerDeParameters.COLUMN_MAPPINGS
+ + " missing from SERDEPROPERTIES");
+ }
+
+ try {
+ String tblName = getTableName(table);
+ Connector connector = connectionParams.getConnector();
+ TableOperations tableOpts = connector.tableOperations();
+
+ // Attempt to create the table, taking EXTERNAL into consideration
+ if (!tableOpts.exists(tblName)) {
+ if (!isExternal) {
+ tableOpts.create(tblName);
+ } else {
+ throw new MetaException("Accumulo table " + tblName
+ + " doesn't exist even though declared external");
+ }
+ } else {
+ if (!isExternal) {
+ throw new MetaException("Table " + tblName
+ + " already exists in Accumulo. Use CREATE EXTERNAL TABLE to register with Hive.");
+ }
+ }
+ } catch (AccumuloSecurityException e) {
+ throw new MetaException(StringUtils.stringifyException(e));
+ } catch (TableExistsException e) {
+ throw new MetaException(StringUtils.stringifyException(e));
+ } catch (AccumuloException e) {
+ throw new MetaException(StringUtils.stringifyException(e));
+ }
+ }
+
+ protected boolean isExternalTable(Table table) {
+ return MetaStoreUtils.isExternalTable(table);
+ }
+
+ @Override
+ public void rollbackCreateTable(Table table) throws MetaException {
+ // Same as commitDropTable where we always delete the data (accumulo table)
+ commitDropTable(table, true);
+ }
+
+ @Override
+ public void commitCreateTable(Table table) throws MetaException {
+ // do nothing
+ }
+
+ @Override
+ public void commitDropTable(Table table, boolean deleteData) throws MetaException {
+ String tblName = getTableName(table);
+ if (!isExternalTable(table)) {
+ try {
+ if (deleteData) {
+ TableOperations tblOpts = connectionParams.getConnector().tableOperations();
+ if (tblOpts.exists(tblName)) {
+ tblOpts.delete(tblName);
+ }
+ }
+ } catch (AccumuloException e) {
+ throw new MetaException(StringUtils.stringifyException(e));
+ } catch (AccumuloSecurityException e) {
+ throw new MetaException(StringUtils.stringifyException(e));
+ } catch (TableNotFoundException e) {
+ throw new MetaException(StringUtils.stringifyException(e));
+ }
+ }
+ }
+
+ @Override
+ public void preDropTable(Table table) throws MetaException {
+ // do nothing
+ }
+
+ @Override
+ public void rollbackDropTable(Table table) throws MetaException {
+ // do nothing
+ }
+
+ @Override
+ public DecomposedPredicate decomposePredicate(JobConf conf, Deserializer deserializer,
+ ExprNodeDesc desc) {
+ if (!(deserializer instanceof AccumuloSerDe)) {
+ throw new RuntimeException("Expected an AccumuloSerDe but got "
+ + deserializer.getClass().getName());
+ }
+
+ AccumuloSerDe serDe = (AccumuloSerDe) deserializer;
+ if (serDe.getIteratorPushdown()) {
+ return predicateHandler.decompose(conf, desc);
+ } else {
+ log.info("Set to ignore Accumulo iterator pushdown, skipping predicate handler.");
+ return null;
+ }
+ }
+
+ @Override
+ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
+ try {
+ Utils.addDependencyJars(jobConf, Tracer.class, Fate.class, Connector.class, Main.class,
+ ZooKeeper.class, AccumuloStorageHandler.class);
+ } catch (IOException e) {
+ log.error("Could not add necessary Accumulo dependencies to classpath", e);
+ }
+
+ Properties tblProperties = tableDesc.getProperties();
+ AccumuloSerDeParameters serDeParams = null;
+ try {
+ serDeParams = new AccumuloSerDeParameters(jobConf, tblProperties, AccumuloSerDe.class.getName());
+ } catch (SerDeException e) {
+ log.error("Could not instantiate AccumuloSerDeParameters", e);
+ return;
+ }
+
+ try {
+ serDeParams.getRowIdFactory().addDependencyJars(jobConf);
+ } catch (IOException e) {
+ log.error("Could not add necessary dependencies for " + serDeParams.getRowIdFactory().getClass(), e);
+ }
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/LazyAccumuloMap.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/LazyAccumuloMap.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/LazyAccumuloMap.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/LazyAccumuloMap.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.accumulo.AccumuloHiveRow.ColumnTuple;
+import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloMapColumnMapping;
+import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyMap;
+import org.apache.hadoop.hive.serde2.lazy.LazyObject;
+import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Charsets;
+
+/**
+ * A Hive Map created from some collection of Key-Values from one to many column families with one
+ * to many column qualifiers.
+ */
+public class LazyAccumuloMap extends LazyMap {
+
+ protected AccumuloHiveRow sourceRow;
+ protected HiveAccumuloMapColumnMapping columnMapping;
+
+ public LazyAccumuloMap(LazyMapObjectInspector oi) {
+ super(oi);
+ }
+
+ public void init(AccumuloHiveRow row, HiveAccumuloMapColumnMapping columnMapping) {
+ this.sourceRow = row;
+ this.columnMapping = columnMapping;
+
+ this.setParsed(false);
+ }
+
+ protected void parse() {
+ if (null == this.cachedMap) {
+ this.cachedMap = new LinkedHashMap<Object,Object>();
+ } else {
+ this.cachedMap.clear();
+ }
+
+ LazyMapObjectInspector lazyMoi = getInspector();
+
+ Text cf = new Text(columnMapping.getColumnFamily());
+ for (ColumnTuple tuple : sourceRow.getTuples()) {
+ String cq = tuple.getCq().toString();
+
+ if (!cf.equals(tuple.getCf()) || !cq.startsWith(columnMapping.getColumnQualifierPrefix())) {
+ // A column family or qualifier we don't want to include in the map
+ continue;
+ }
+
+ // Because we append the cq prefix when serializing the column
+ // we should also remove it when pulling it from Accumulo
+ cq = cq.substring(columnMapping.getColumnQualifierPrefix().length());
+
+ // Keys are always primitive, respect the binary
+ LazyPrimitive<? extends ObjectInspector,? extends Writable> key = LazyFactory
+ .createLazyPrimitiveClass((PrimitiveObjectInspector) lazyMoi.getMapKeyObjectInspector(),
+ ColumnEncoding.BINARY == columnMapping.getKeyEncoding());
+
+ ByteArrayRef keyRef = new ByteArrayRef();
+ keyRef.setData(cq.getBytes(Charsets.UTF_8));
+ key.init(keyRef, 0, keyRef.getData().length);
+
+ // Value can be anything, use the obj inspector and respect binary
+ LazyObject<?> value = LazyFactory.createLazyObject(lazyMoi.getMapValueObjectInspector(),
+ ColumnEncoding.BINARY == columnMapping.getValueEncoding());
+
+ ByteArrayRef valueRef = new ByteArrayRef();
+ valueRef.setData(tuple.getValue());
+ value.init(valueRef, 0, valueRef.getData().length);
+
+ cachedMap.put(key, value);
+ }
+
+ this.setParsed(true);
+ }
+
+ /**
+ * Get the value in the map for the given key.
+ *
+ * @param key
+ * The key, a column qualifier, from the map
+ * @return The object in the map at the given key
+ */
+ @Override
+ public Object getMapValueElement(Object key) {
+ if (!getParsed()) {
+ parse();
+ }
+
+ for (Map.Entry<Object,Object> entry : cachedMap.entrySet()) {
+ LazyPrimitive<?,?> lazyKey = (LazyPrimitive<?,?>) entry.getKey();
+
+ // getWritableObject() will convert LazyPrimitive to actual primitive
+ // writable objects.
+ Object keyI = lazyKey.getWritableObject();
+ if (keyI == null) {
+ continue;
+ }
+ if (keyI.equals(key)) {
+ // Got a match, return the value
+ LazyObject<?> v = (LazyObject<?>) entry.getValue();
+ return v == null ? v : v.getObject();
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public Map<Object,Object> getMap() {
+ if (!getParsed()) {
+ parse();
+ }
+ return cachedMap;
+ }
+
+ @Override
+ public int getMapSize() {
+ if (!getParsed()) {
+ parse();
+ }
+ return cachedMap.size();
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/LazyAccumuloRow.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/LazyAccumuloRow.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/LazyAccumuloRow.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/LazyAccumuloRow.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,140 @@
+package org.apache.hadoop.hive.accumulo;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding;
+import org.apache.hadoop.hive.accumulo.columns.ColumnMapping;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloMapColumnMapping;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloRowIdColumnMapping;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloRowIdFactory;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
+import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+/**
+ *
+ * Parses column tuples in each AccumuloHiveRow and creates Lazy objects for each field.
+ *
+ */
+public class LazyAccumuloRow extends LazyStruct {
+ private static final Logger log = Logger.getLogger(LazyAccumuloRow.class);
+
+ private AccumuloHiveRow row;
+ private List<ColumnMapping> columnMappings;
+ private ArrayList<Object> cachedList = new ArrayList<Object>();
+ private AccumuloRowIdFactory rowIdFactory;
+
+ public LazyAccumuloRow(LazySimpleStructObjectInspector inspector) {
+ super(inspector);
+ }
+
+ public void init(AccumuloHiveRow hiveRow, List<ColumnMapping> columnMappings,
+ AccumuloRowIdFactory rowIdFactory) {
+ this.row = hiveRow;
+ this.columnMappings = columnMappings;
+ this.rowIdFactory = rowIdFactory;
+ setParsed(false);
+ }
+
+ private void parse() {
+ if (getFields() == null) {
+ // Will properly set string or binary serialization via createLazyField(...)
+ initLazyFields(oi.getAllStructFieldRefs());
+ }
+ if (!getParsed()) {
+ Arrays.fill(getFieldInited(), false);
+ setParsed(true);
+ }
+ }
+
+ @Override
+ public Object getField(int id) {
+ if (!getParsed()) {
+ parse();
+ }
+ return uncheckedGetField(id);
+ }
+
+ /*
+ * split pairs by delimiter.
+ */
+ private Object uncheckedGetField(int id) {
+ if (!getFieldInited()[id]) {
+ ByteArrayRef ref;
+ ColumnMapping columnMapping = columnMappings.get(id);
+
+ if (columnMapping instanceof HiveAccumuloMapColumnMapping) {
+ HiveAccumuloMapColumnMapping mapColumnMapping = (HiveAccumuloMapColumnMapping) columnMapping;
+
+ LazyAccumuloMap map = (LazyAccumuloMap) getFields()[id];
+ map.init(row, mapColumnMapping);
+ } else {
+ if (columnMapping instanceof HiveAccumuloRowIdColumnMapping) {
+ // Use the rowID directly
+ ref = new ByteArrayRef();
+ ref.setData(row.getRowId().getBytes());
+ } else if (columnMapping instanceof HiveAccumuloColumnMapping) {
+ HiveAccumuloColumnMapping accumuloColumnMapping = (HiveAccumuloColumnMapping) columnMapping;
+
+ // Use the colfam and colqual to get the value
+ byte[] val = row.getValue(new Text(accumuloColumnMapping.getColumnFamily()), new Text(
+ accumuloColumnMapping.getColumnQualifier()));
+ if (val == null) {
+ return null;
+ } else {
+ ref = new ByteArrayRef();
+ ref.setData(val);
+ }
+ } else {
+ log.error("Could not process ColumnMapping of type " + columnMapping.getClass()
+ + " at offset " + id + " in column mapping: " + columnMapping.getMappingSpec());
+ throw new IllegalArgumentException("Cannot process ColumnMapping of type "
+ + columnMapping.getClass());
+ }
+
+ getFields()[id].init(ref, 0, ref.getData().length);
+ }
+
+ // HIVE-3179 only init the field when it isn't null
+ getFieldInited()[id] = true;
+ }
+
+ return getFields()[id].getObject();
+ }
+
+ @Override
+ public ArrayList<Object> getFieldsAsList() {
+ if (!getParsed()) {
+ parse();
+ }
+ cachedList.clear();
+ for (int i = 0; i < getFields().length; i++) {
+ cachedList.add(uncheckedGetField(i));
+ }
+ return cachedList;
+ }
+
+ @Override
+ protected LazyObjectBase createLazyField(int fieldID, StructField fieldRef) throws SerDeException {
+ final ColumnMapping columnMapping = columnMappings.get(fieldID);
+
+ if (columnMapping instanceof HiveAccumuloRowIdColumnMapping) {
+ return rowIdFactory.createRowId(fieldRef.getFieldObjectInspector());
+ } else if (columnMapping instanceof HiveAccumuloMapColumnMapping) {
+ return new LazyAccumuloMap((LazyMapObjectInspector) fieldRef.getFieldObjectInspector());
+ } else {
+ return LazyFactory.createLazyObject(fieldRef.getFieldObjectInspector(),
+ ColumnEncoding.BINARY == columnMapping.getEncoding());
+ }
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hive.accumulo;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.text.MessageFormat;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.jar.JarFile;
+import java.util.jar.JarOutputStream;
+import java.util.jar.Manifest;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+import java.util.zip.ZipOutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Accumulo doesn't have a TableMapReduceUtil.addDependencyJars method like HBase which is very
+ * helpful
+ */
+public class Utils {
+ private static final Logger log = Logger.getLogger(Utils.class);
+
+ // Thanks, HBase
+ public static void addDependencyJars(Configuration conf, Class<?>... classes) throws IOException {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ Set<String> jars = new HashSet<String>();
+ // Add jars that are already in the tmpjars variable
+ jars.addAll(conf.getStringCollection("tmpjars"));
+
+ // add jars as we find them to a map of contents jar name so that we can
+ // avoid
+ // creating new jars for classes that have already been packaged.
+ Map<String,String> packagedClasses = new HashMap<String,String>();
+
+ // Add jars containing the specified classes
+ for (Class<?> clazz : classes) {
+ if (clazz == null)
+ continue;
+
+ Path path = findOrCreateJar(clazz, localFs, packagedClasses);
+ if (path == null) {
+ log.warn("Could not find jar for class " + clazz + " in order to ship it to the cluster.");
+ continue;
+ }
+ if (!localFs.exists(path)) {
+ log.warn("Could not validate jar file " + path + " for class " + clazz);
+ continue;
+ }
+ jars.add(path.toString());
+ }
+ if (jars.isEmpty())
+ return;
+
+ conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
+ }
+
+ /**
+ * If org.apache.hadoop.util.JarFinder is available (0.23+ hadoop), finds the Jar for a class or
+ * creates it if it doesn't exist. If the class is in a directory in the classpath, it creates a
+ * Jar on the fly with the contents of the directory and returns the path to that Jar. If a Jar is
+ * created, it is created in the system temporary directory. Otherwise, returns an existing jar
+ * that contains a class of the same name. Maintains a mapping from jar contents to the tmp jar
+ * created.
+ *
+ * @param my_class
+ * the class to find.
+ * @param fs
+ * the FileSystem with which to qualify the returned path.
+ * @param packagedClasses
+ * a map of class name to path.
+ * @return a jar file that contains the class.
+ * @throws IOException
+ */
+ @SuppressWarnings("deprecation")
+ private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
+ Map<String,String> packagedClasses) throws IOException {
+ // attempt to locate an existing jar for the class.
+ String jar = findContainingJar(my_class, packagedClasses);
+ if (null == jar || jar.isEmpty()) {
+ jar = getJar(my_class);
+ updateMap(jar, packagedClasses);
+ }
+
+ if (null == jar || jar.isEmpty()) {
+ return null;
+ }
+
+ log.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
+ return new Path(jar).makeQualified(fs);
+ }
+
+ /**
+ * Add entries to <code>packagedClasses</code> corresponding to class files contained in
+ * <code>jar</code>.
+ *
+ * @param jar
+ * The jar who's content to list.
+ * @param packagedClasses
+ * map[class -> jar]
+ */
+ private static void updateMap(String jar, Map<String,String> packagedClasses) throws IOException {
+ if (null == jar || jar.isEmpty()) {
+ return;
+ }
+ ZipFile zip = null;
+ try {
+ zip = new ZipFile(jar);
+ for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
+ ZipEntry entry = iter.nextElement();
+ if (entry.getName().endsWith("class")) {
+ packagedClasses.put(entry.getName(), jar);
+ }
+ }
+ } finally {
+ if (null != zip)
+ zip.close();
+ }
+ }
+
+ /**
+ * Find a jar that contains a class of the same name, if any. It will return a jar file, even if
+ * that is not the first thing on the class path that has a class with the same name. Looks first
+ * on the classpath and then in the <code>packagedClasses</code> map.
+ *
+ * @param my_class
+ * the class to find.
+ * @return a jar file that contains the class, or null.
+ * @throws IOException
+ */
+ private static String findContainingJar(Class<?> my_class, Map<String,String> packagedClasses)
+ throws IOException {
+ ClassLoader loader = my_class.getClassLoader();
+ String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
+
+ // first search the classpath
+ for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
+ URL url = itr.nextElement();
+ if ("jar".equals(url.getProtocol())) {
+ String toReturn = url.getPath();
+ if (toReturn.startsWith("file:")) {
+ toReturn = toReturn.substring("file:".length());
+ }
+ // URLDecoder is a misnamed class, since it actually decodes
+ // x-www-form-urlencoded MIME type rather than actual
+ // URL encoding (which the file path has). Therefore it would
+ // decode +s to ' 's which is incorrect (spaces are actually
+ // either unencoded or encoded as "%20"). Replace +s first, so
+ // that they are kept sacred during the decoding process.
+ toReturn = toReturn.replaceAll("\\+", "%2B");
+ toReturn = URLDecoder.decode(toReturn, "UTF-8");
+ return toReturn.replaceAll("!.*$", "");
+ }
+ }
+
+ // now look in any jars we've packaged using JarFinder. Returns null
+ // when
+ // no jar is found.
+ return packagedClasses.get(class_file);
+ }
+
+ /**
+ * Invoke 'getJar' on a JarFinder implementation. Useful for some job configuration contexts
+ * (HBASE-8140) and also for testing on MRv2. First check if we have HADOOP-9426. Lacking that,
+ * fall back to the backport.
+ *
+ * @param my_class
+ * the class to find.
+ * @return a jar file that contains the class, or null.
+ */
+ private static String getJar(Class<?> my_class) {
+ String ret = null;
+ String hadoopJarFinder = "org.apache.hadoop.util.JarFinder";
+ Class<?> jarFinder = null;
+ try {
+ log.debug("Looking for " + hadoopJarFinder + ".");
+ jarFinder = Class.forName(hadoopJarFinder);
+ log.debug(hadoopJarFinder + " found.");
+ Method getJar = jarFinder.getMethod("getJar", Class.class);
+ ret = (String) getJar.invoke(null, my_class);
+ } catch (ClassNotFoundException e) {
+ log.debug("Using backported JarFinder.");
+ ret = jarFinderGetJar(my_class);
+ } catch (InvocationTargetException e) {
+ // function was properly called, but threw it's own exception.
+ // Unwrap it
+ // and pass it on.
+ throw new RuntimeException(e.getCause());
+ } catch (Exception e) {
+ // toss all other exceptions, related to reflection failure
+ throw new RuntimeException("getJar invocation failed.", e);
+ }
+
+ return ret;
+ }
+
+ /**
+ * Returns the full path to the Jar containing the class. It always return a JAR.
+ *
+ * @param klass
+ * class.
+ *
+ * @return path to the Jar containing the class.
+ */
+ @SuppressWarnings("rawtypes")
+ public static String jarFinderGetJar(Class klass) {
+ Preconditions.checkNotNull(klass, "klass");
+ ClassLoader loader = klass.getClassLoader();
+ if (loader != null) {
+ String class_file = klass.getName().replaceAll("\\.", "/") + ".class";
+ try {
+ for (Enumeration itr = loader.getResources(class_file); itr.hasMoreElements();) {
+ URL url = (URL) itr.nextElement();
+ String path = url.getPath();
+ if (path.startsWith("file:")) {
+ path = path.substring("file:".length());
+ }
+ path = URLDecoder.decode(path, "UTF-8");
+ if ("jar".equals(url.getProtocol())) {
+ path = URLDecoder.decode(path, "UTF-8");
+ return path.replaceAll("!.*$", "");
+ } else if ("file".equals(url.getProtocol())) {
+ String klassName = klass.getName();
+ klassName = klassName.replace(".", "/") + ".class";
+ path = path.substring(0, path.length() - klassName.length());
+ File baseDir = new File(path);
+ File testDir = new File(System.getProperty("test.build.dir", "target/test-dir"));
+ testDir = testDir.getAbsoluteFile();
+ if (!testDir.exists()) {
+ testDir.mkdirs();
+ }
+ File tempJar = File.createTempFile("hadoop-", "", testDir);
+ tempJar = new File(tempJar.getAbsolutePath() + ".jar");
+ createJar(baseDir, tempJar);
+ return tempJar.getAbsolutePath();
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return null;
+ }
+
+ private static void copyToZipStream(InputStream is, ZipEntry entry, ZipOutputStream zos)
+ throws IOException {
+ zos.putNextEntry(entry);
+ byte[] arr = new byte[4096];
+ int read = is.read(arr);
+ while (read > -1) {
+ zos.write(arr, 0, read);
+ read = is.read(arr);
+ }
+ is.close();
+ zos.closeEntry();
+ }
+
+ public static void jarDir(File dir, String relativePath, ZipOutputStream zos) throws IOException {
+ Preconditions.checkNotNull(relativePath, "relativePath");
+ Preconditions.checkNotNull(zos, "zos");
+
+ // by JAR spec, if there is a manifest, it must be the first entry in
+ // the
+ // ZIP.
+ File manifestFile = new File(dir, JarFile.MANIFEST_NAME);
+ ZipEntry manifestEntry = new ZipEntry(JarFile.MANIFEST_NAME);
+ if (!manifestFile.exists()) {
+ zos.putNextEntry(manifestEntry);
+ new Manifest().write(new BufferedOutputStream(zos));
+ zos.closeEntry();
+ } else {
+ InputStream is = new FileInputStream(manifestFile);
+ copyToZipStream(is, manifestEntry, zos);
+ }
+ zos.closeEntry();
+ zipDir(dir, relativePath, zos, true);
+ zos.close();
+ }
+
+ private static void zipDir(File dir, String relativePath, ZipOutputStream zos, boolean start)
+ throws IOException {
+ String[] dirList = dir.list();
+ for (String aDirList : dirList) {
+ File f = new File(dir, aDirList);
+ if (!f.isHidden()) {
+ if (f.isDirectory()) {
+ if (!start) {
+ ZipEntry dirEntry = new ZipEntry(relativePath + f.getName() + "/");
+ zos.putNextEntry(dirEntry);
+ zos.closeEntry();
+ }
+ String filePath = f.getPath();
+ File file = new File(filePath);
+ zipDir(file, relativePath + f.getName() + "/", zos, false);
+ } else {
+ String path = relativePath + f.getName();
+ if (!path.equals(JarFile.MANIFEST_NAME)) {
+ ZipEntry anEntry = new ZipEntry(path);
+ InputStream is = new FileInputStream(f);
+ copyToZipStream(is, anEntry, zos);
+ }
+ }
+ }
+ }
+ }
+
+ private static void createJar(File dir, File jarFile) throws IOException {
+ Preconditions.checkNotNull(dir, "dir");
+ Preconditions.checkNotNull(jarFile, "jarFile");
+ File jarDir = jarFile.getParentFile();
+ if (!jarDir.exists()) {
+ if (!jarDir.mkdirs()) {
+ throw new IOException(MessageFormat.format("could not create dir [{0}]", jarDir));
+ }
+ }
+ JarOutputStream zos = new JarOutputStream(new FileOutputStream(jarFile));
+ jarDir(dir, "", zos);
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnEncoding.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnEncoding.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnEncoding.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnEncoding.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.columns;
+
+import java.util.HashMap;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+/**
+ * Encapsulate the encoding of values within the given column in Accumulo
+ */
+public enum ColumnEncoding {
+ STRING("string", "s"), BINARY("binary", "b");
+
+ private static final HashMap<String,ColumnEncoding> CODE_CACHE = new HashMap<String,ColumnEncoding>(),
+ NAME_CACHE = new HashMap<String,ColumnEncoding>();
+
+ static {
+ CODE_CACHE.put(STRING.getCode(), STRING);
+ CODE_CACHE.put(BINARY.getCode(), BINARY);
+
+ NAME_CACHE.put(STRING.getName(), STRING);
+ NAME_CACHE.put(BINARY.getName(), BINARY);
+ }
+
+ private final String name;
+ private final String code;
+
+ private ColumnEncoding(String name, String code) {
+ this.name = name;
+ this.code = code;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ public String getCode() {
+ return code;
+ }
+
+ /**
+ * Get the ColumnEncoding which has the given code.
+ *
+ * @param code
+ * The one-character 'code' which uniquely identifies the ColumnEncoding
+ * @return The ColumnEncoding with the code equal to the provided argument
+ */
+ public static ColumnEncoding fromCode(String code) {
+ if (!CODE_CACHE.containsKey(code)) {
+ throw new IllegalArgumentException("No ColumnEncoding defined with code " + code);
+ }
+
+ return CODE_CACHE.get(code);
+ }
+
+ public static ColumnEncoding fromName(String name) {
+ if (!NAME_CACHE.containsKey(name)) {
+ throw new IllegalArgumentException("No ColumnEncoding defined with name " + name);
+ }
+
+ return NAME_CACHE.get(name);
+ }
+
+ public static ColumnEncoding get(String nameOrCode) {
+ ColumnEncoding encoding = CODE_CACHE.get(nameOrCode);
+ if (null != encoding) {
+ return encoding;
+ }
+
+ encoding = NAME_CACHE.get(nameOrCode);
+ if (null != encoding) {
+ return encoding;
+ }
+
+ throw new IllegalArgumentException("No ColumnEncoding defined for " + nameOrCode);
+ }
+
+ public static ColumnEncoding getFromMapping(String columnMapping) {
+ Preconditions.checkNotNull(columnMapping);
+
+ String encoding = getColumnEncoding(columnMapping);
+
+ return get(encoding);
+ }
+
+ /**
+ * Determines if a custom encoding was specified for the give column.
+ *
+ * @param columnMapping
+ * The mapping from Hive column to an Accumulo column
+ * @return True if the column mapping string specifies an encoding, false otherwise
+ */
+ public static boolean hasColumnEncoding(String columnMapping) {
+ Preconditions.checkNotNull(columnMapping);
+
+ int offset = columnMapping.lastIndexOf(AccumuloHiveConstants.POUND);
+
+ // Make sure that the '#' wasn't escaped
+ if (0 < offset && AccumuloHiveConstants.ESCAPE == columnMapping.charAt(offset - 1)) {
+ // The encoding name/codes don't contain pound signs
+ return false;
+ }
+
+ return -1 != offset;
+ }
+
+ public static String getColumnEncoding(String columnMapping) {
+ int offset = columnMapping.lastIndexOf(AccumuloHiveConstants.POUND);
+
+ // Make sure that the '#' wasn't escaped
+ if (0 < offset && AccumuloHiveConstants.ESCAPE == columnMapping.charAt(offset - 1)) {
+ throw new IllegalArgumentException("Column mapping did not contain a column encoding: "
+ + columnMapping);
+ }
+
+ return columnMapping.substring(offset + 1);
+ }
+
+ public static ColumnEncoding getDefault() {
+ return STRING;
+ }
+
+ /**
+ * Removes the column encoding code and separator from the original column mapping string. Throws
+ * an IllegalArgumentException if this method is called on a string that doesn't contain a code.
+ *
+ * @param columnMapping
+ * The mapping from Hive column to Accumulo column
+ * @return The column mapping with the code removed
+ */
+ public static String stripCode(String columnMapping) {
+ Preconditions.checkNotNull(columnMapping);
+
+ int offset = columnMapping.lastIndexOf(AccumuloHiveConstants.POUND);
+ if (-1 == offset
+ || (0 < offset && AccumuloHiveConstants.ESCAPE == columnMapping.charAt(offset - 1))) {
+ throw new IllegalArgumentException(
+ "Provided column mapping does not define a column encoding");
+ }
+
+ return columnMapping.substring(0, offset);
+ }
+
+ public static boolean isMapEncoding(String columnEncoding) {
+ return -1 != columnEncoding.indexOf(AccumuloHiveConstants.COLON);
+ }
+
+ public static Entry<ColumnEncoding,ColumnEncoding> getMapEncoding(String columnEncoding) {
+ int index = columnEncoding.indexOf(AccumuloHiveConstants.COLON);
+ if (-1 == index) {
+ throw new IllegalArgumentException(
+ "Serialized column encoding did not contain a pair of encodings to split");
+ }
+
+ String encoding1 = columnEncoding.substring(0, index), encoding2 = columnEncoding
+ .substring(index + 1);
+
+ return Maps.immutableEntry(get(encoding1), get(encoding2));
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMapper.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMapper.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMapper.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.columns;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants;
+import org.apache.hadoop.hive.accumulo.serde.TooManyAccumuloColumnsException;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *
+ */
+public class ColumnMapper {
+ private static final Logger log = Logger.getLogger(ColumnMapper.class);
+
+ private List<ColumnMapping> columnMappings;
+ private int rowIdOffset;
+ private HiveAccumuloRowIdColumnMapping rowIdMapping = null;
+ private final ColumnEncoding defaultEncoding;
+
+ /**
+ * Create a mapping from Hive columns (rowID and column) to Accumulo columns (column family and
+ * qualifier). The ordering of the {@link ColumnMapping}s is important as it aligns with the
+ * ordering of the columns for the Hive table schema.
+ *
+ * @param serializedColumnMappings
+ * Comma-separated list of designators that map to Accumulo columns whose offsets
+ * correspond to the Hive table schema
+ * @throws TooManyAccumuloColumnsException
+ */
+ public ColumnMapper(String serializedColumnMappings, String defaultStorageType,
+ List<String> columnNames, List<TypeInfo> columnTypes) throws TooManyAccumuloColumnsException {
+ Preconditions.checkNotNull(serializedColumnMappings);
+
+ String[] parsedColumnMappingValue = StringUtils.split(serializedColumnMappings,
+ AccumuloHiveConstants.COMMA);
+ columnMappings = new ArrayList<ColumnMapping>(parsedColumnMappingValue.length);
+ rowIdOffset = -1;
+
+ // Determine the default encoding type (specified on the table, or the global default
+ // if none was provided)
+ if (null == defaultStorageType || "".equals(defaultStorageType)) {
+ defaultEncoding = ColumnEncoding.getDefault();
+ } else {
+ defaultEncoding = ColumnEncoding.get(defaultStorageType.toLowerCase());
+ }
+
+ if (parsedColumnMappingValue.length > columnNames.size()) {
+ throw new TooManyAccumuloColumnsException("Found " + parsedColumnMappingValue.length
+ + " columns, but only know of " + columnNames.size() + " Hive column names");
+ }
+
+ if (parsedColumnMappingValue.length > columnTypes.size()) {
+ throw new TooManyAccumuloColumnsException("Found " + parsedColumnMappingValue.length
+ + " columns, but only know of " + columnNames.size() + " Hive column types");
+ }
+
+ for (int i = 0; i < parsedColumnMappingValue.length; i++) {
+ String columnMappingStr = parsedColumnMappingValue[i];
+
+ // Create the mapping for this column, with configured encoding
+ ColumnMapping columnMapping = ColumnMappingFactory.get(columnMappingStr, defaultEncoding,
+ columnNames.get(i), columnTypes.get(i));
+
+ if (columnMapping instanceof HiveAccumuloRowIdColumnMapping) {
+ if (-1 != rowIdOffset) {
+ throw new IllegalArgumentException(
+ "Column mapping should only have one definition with a value of "
+ + AccumuloHiveConstants.ROWID);
+ }
+
+ rowIdOffset = i;
+ rowIdMapping = (HiveAccumuloRowIdColumnMapping) columnMapping;
+ }
+
+ columnMappings.add(columnMapping);
+ }
+ }
+
+ public int size() {
+ return columnMappings.size();
+ }
+
+ public ColumnMapping get(int i) {
+ return columnMappings.get(i);
+ }
+
+ public List<ColumnMapping> getColumnMappings() {
+ return Collections.unmodifiableList(columnMappings);
+ }
+
+ public boolean hasRowIdMapping() {
+ return null != rowIdMapping;
+ }
+
+ public HiveAccumuloRowIdColumnMapping getRowIdMapping() {
+ return rowIdMapping;
+ }
+
+ public int getRowIdOffset() {
+ return rowIdOffset;
+ }
+
+ public String getTypesString() {
+ StringBuilder sb = new StringBuilder();
+ for (ColumnMapping columnMapping : columnMappings) {
+ if (sb.length() > 0) {
+ sb.append(AccumuloHiveConstants.COLON);
+ }
+
+ if (columnMapping instanceof HiveAccumuloRowIdColumnMapping) {
+ // the rowID column is a string
+ sb.append(serdeConstants.STRING_TYPE_NAME);
+ } else if (columnMapping instanceof HiveAccumuloColumnMapping) {
+ // a normal column is also a string
+ sb.append(serdeConstants.STRING_TYPE_NAME);
+ } else if (columnMapping instanceof HiveAccumuloMapColumnMapping) {
+ // TODO can we be more precise than string,string?
+ sb.append(serdeConstants.MAP_TYPE_NAME).append("<").append(serdeConstants.STRING_TYPE_NAME)
+ .append(",").append(serdeConstants.STRING_TYPE_NAME).append(">");
+ } else {
+ throw new IllegalArgumentException("Cannot process ColumnMapping of type "
+ + columnMapping.getClass().getName());
+ }
+ }
+
+ return sb.toString();
+ }
+
+ public ColumnMapping getColumnMappingForHiveColumn(List<String> hiveColumns, String hiveColumnName) {
+ Preconditions.checkNotNull(hiveColumns);
+ Preconditions.checkNotNull(hiveColumnName);
+ Preconditions.checkArgument(columnMappings.size() <= hiveColumns.size(),
+ "Expected equal number of column mappings and Hive columns, " + columnMappings + ", "
+ + hiveColumns);
+
+ int hiveColumnOffset = 0;
+ for (; hiveColumnOffset < hiveColumns.size() && hiveColumnOffset < columnMappings.size(); hiveColumnOffset++) {
+ if (hiveColumns.get(hiveColumnOffset).equals(hiveColumnName)) {
+ return columnMappings.get(hiveColumnOffset);
+ }
+ }
+
+ log.error("Could not find offset for Hive column with name '" + hiveColumnName
+ + "' with columns " + hiveColumns);
+ throw new IllegalArgumentException("Could not find offset for Hive column with name "
+ + hiveColumnName);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(32);
+ sb.append("[").append(this.getClass().getSimpleName()).append(" ");
+ sb.append(columnMappings).append(", rowIdOffset: ").append(this.rowIdOffset)
+ .append(", defaultEncoding: ");
+ sb.append(this.defaultEncoding).append("]");
+ return sb.toString();
+ }
+}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMapping.java
URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMapping.java?rev=1619005&view=auto
==============================================================================
--- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMapping.java (added)
+++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMapping.java Tue Aug 19 22:41:10 2014
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.columns;
+
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *
+ */
+public abstract class ColumnMapping {
+
+ // SerDe property for how the Hive column maps to Accumulo
+ protected final String mappingSpec;
+
+ // The manner in which the values in this column are de/serialized from/to Accumulo
+ protected final ColumnEncoding encoding;
+
+ // The name of the Hive column
+ protected final String columnName;
+
+ // The type of the Hive column
+ // Cannot store the actual TypeInfo because that would require
+ // Hive jars on the Accumulo classpath which we don't want
+ protected final String columnType;
+
+ protected ColumnMapping(String mappingSpec, ColumnEncoding encoding, String columnName,
+ String columnType) {
+ Preconditions.checkNotNull(mappingSpec);
+ Preconditions.checkNotNull(encoding);
+ Preconditions.checkNotNull(columnName);
+ Preconditions.checkNotNull(columnType);
+
+ this.mappingSpec = mappingSpec;
+ this.encoding = encoding;
+ this.columnName = columnName;
+ this.columnType = columnType;
+ }
+
+ protected ColumnMapping(String mappingSpec, ColumnEncoding encoding, String columnName,
+ TypeInfo columnType) {
+ Preconditions.checkNotNull(mappingSpec);
+ Preconditions.checkNotNull(encoding);
+ Preconditions.checkNotNull(columnName);
+ Preconditions.checkNotNull(columnType);
+
+ this.mappingSpec = mappingSpec;
+ this.encoding = encoding;
+ this.columnName = columnName;
+ this.columnType = columnType.getTypeName();
+ }
+
+ /**
+ * The property defining how this Column is mapped into Accumulo
+ */
+ public String getMappingSpec() {
+ return mappingSpec;
+ }
+
+ /**
+ * The manner in which the value is encoded in Accumulo
+ */
+ public ColumnEncoding getEncoding() {
+ return encoding;
+ }
+
+ /**
+ * The name of the Hive column this is mapping
+ */
+ public String getColumnName() {
+ return columnName;
+ }
+
+ /**
+ * The @{link TypeInfo} of the Hive column this is mapping
+ */
+ public String getColumnType() {
+ return columnType;
+ }
+}