You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2014/04/10 03:09:01 UTC
svn commit: r1586190 [1/2] - in /hive/branches/branch-0.13: hcatalog/
hcatalog/streaming/ hcatalog/streaming/src/ hcatalog/streaming/src/java/
hcatalog/streaming/src/java/org/ hcatalog/streaming/src/java/org/apache/
hcatalog/streaming/src/java/org/apac...
Author: gates
Date: Thu Apr 10 01:08:59 2014
New Revision: 1586190
URL: http://svn.apache.org/r1586190
Log:
HIVE-5687 Streaming support in Hive (Roshan Naik via gates)
Added:
hive/branches/branch-0.13/hcatalog/streaming/
hive/branches/branch-0.13/hcatalog/streaming/pom.xml
hive/branches/branch-0.13/hcatalog/streaming/src/
hive/branches/branch-0.13/hcatalog/streaming/src/java/
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HeartBeatFailure.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ImpersonationFailed.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidColumn.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidPartition.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTrasactionState.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/PartitionCreationFailed.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/QueryFailedException.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/SerializationError.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingException.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingIOFailure.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatchUnAvailable.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java
hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
hive/branches/branch-0.13/hcatalog/streaming/src/test/
hive/branches/branch-0.13/hcatalog/streaming/src/test/org/
hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/
hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/
hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/
hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/
hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java
hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestDelimitedInputWriter.java
hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
hive/branches/branch-0.13/hcatalog/streaming/src/test/sit
Modified:
hive/branches/branch-0.13/hcatalog/pom.xml
hive/branches/branch-0.13/packaging/pom.xml
hive/branches/branch-0.13/packaging/src/main/assembly/bin.xml
Modified: hive/branches/branch-0.13/hcatalog/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/pom.xml?rev=1586190&r1=1586189&r2=1586190&view=diff
==============================================================================
--- hive/branches/branch-0.13/hcatalog/pom.xml (original)
+++ hive/branches/branch-0.13/hcatalog/pom.xml Thu Apr 10 01:08:59 2014
@@ -44,6 +44,7 @@
<module>webhcat/java-client</module>
<module>webhcat/svr</module>
<module>storage-handlers/hbase</module>
+ <module>streaming</module>
</modules>
<profiles>
Added: hive/branches/branch-0.13/hcatalog/streaming/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/pom.xml?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/pom.xml (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/pom.xml Thu Apr 10 01:08:59 2014
@@ -0,0 +1,124 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-hcatalog</artifactId>
+ <version>0.14.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>hive-hcatalog-streaming</artifactId>
+ <packaging>jar</packaging>
+ <name>Hive HCatalog Streaming</name>
+
+ <properties>
+ <hive.path.to.root>../..</hive.path.to.root>
+ </properties>
+
+ <profiles>
+ <profile>
+ <id>hadoop-1</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <optional>true</optional>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>hadoop-2</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <optional>true</optional>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
+ <dependencies>
+ <!-- dependencies are always listed in sorted order by groupId, artifectId -->
+ <!-- intra-project -->
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-serde</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-metastore</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-cli</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-hcatalog-core</artifactId>
+ <optional>true</optional>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- test -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <sourceDirectory>${basedir}/src/java</sourceDirectory>
+ <testSourceDirectory>${basedir}/src/test</testSourceDirectory>
+ <resources>
+ </resources>
+ <plugins>
+ <!-- plugins are always listed in sorted order by groupId, artifectId -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+
+import java.util.Random;
+
+abstract class AbstractRecordWriter implements RecordWriter {
+ static final private Log LOG = LogFactory.getLog(AbstractRecordWriter.class.getName());
+
+ final HiveConf conf;
+ final HiveEndPoint endPoint;
+ final Table tbl;
+
+ final HiveMetaStoreClient msClient;
+ RecordUpdater updater = null;
+
+ private final int totalBuckets;
+ private Random rand = new Random();
+ private int currentBucketId = 0;
+ private final Path partitionPath;
+
+ final AcidOutputFormat<?> outf;
+
+ protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf)
+ throws ConnectionError, StreamingException {
+ this.endPoint = endPoint;
+ this.conf = conf!=null ? conf
+ : HiveEndPoint.createHiveConf(DelimitedInputWriter.class, endPoint.metaStoreUri);
+ try {
+ msClient = new HiveMetaStoreClient(conf);
+ this.tbl = msClient.getTable(endPoint.database, endPoint.table);
+ this.partitionPath = getPathForEndPoint(msClient, endPoint);
+ this.totalBuckets = tbl.getSd().getNumBuckets();
+ if(totalBuckets <= 0) {
+ throw new StreamingException("Cannot stream to table that has not been bucketed : "
+ + endPoint);
+ }
+ String outFormatName = this.tbl.getSd().getOutputFormat();
+ outf = (AcidOutputFormat<?>) ReflectionUtils.newInstance(Class.forName(outFormatName), conf);
+ } catch (MetaException e) {
+ throw new ConnectionError(endPoint, e);
+ } catch (NoSuchObjectException e) {
+ throw new ConnectionError(endPoint, e);
+ } catch (TException e) {
+ throw new StreamingException(e.getMessage(), e);
+ } catch (ClassNotFoundException e) {
+ throw new StreamingException(e.getMessage(), e);
+ }
+ }
+
+ protected AbstractRecordWriter(HiveEndPoint endPoint)
+ throws ConnectionError, StreamingException {
+ this(endPoint, HiveEndPoint.createHiveConf(AbstractRecordWriter.class, endPoint.metaStoreUri) );
+ }
+
+ abstract SerDe getSerde() throws SerializationError;
+
+ @Override
+ public void flush() throws StreamingIOFailure {
+ try {
+ updater.flush();
+ } catch (IOException e) {
+ throw new StreamingIOFailure("Unable to flush recordUpdater", e);
+ }
+ }
+
+ @Override
+ public void clear() throws StreamingIOFailure {
+ }
+
+ /**
+ * Creates a new record updater for the new batch
+ * @param minTxnId smallest Txnid in the batch
+ * @param maxTxnID largest Txnid in the batch
+ * @throws StreamingIOFailure if failed to create record updater
+ */
+ @Override
+ public void newBatch(Long minTxnId, Long maxTxnID)
+ throws StreamingIOFailure, SerializationError {
+ try {
+ this.currentBucketId = rand.nextInt(totalBuckets);
+ LOG.debug("Creating Record updater");
+ updater = createRecordUpdater(currentBucketId, minTxnId, maxTxnID);
+ } catch (IOException e) {
+ LOG.error("Failed creating record updater", e);
+ throw new StreamingIOFailure("Unable to get new record Updater", e);
+ }
+ }
+
+ @Override
+ public void closeBatch() throws StreamingIOFailure {
+ try {
+ updater.close(false);
+ updater = null;
+ } catch (IOException e) {
+ throw new StreamingIOFailure("Unable to close recordUpdater", e);
+ }
+ }
+
+ private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID)
+ throws IOException, SerializationError {
+ try {
+ return outf.getRecordUpdater(partitionPath,
+ new AcidOutputFormat.Options(conf)
+ .inspector(getSerde().getObjectInspector())
+ .bucket(bucketId)
+ .minimumTransactionId(minTxnId)
+ .maximumTransactionId(maxTxnID));
+ } catch (SerDeException e) {
+ throw new SerializationError("Failed to get object inspector from Serde "
+ + getSerde().getClass().getName(), e);
+ }
+ }
+
+ private Path getPathForEndPoint(HiveMetaStoreClient msClient, HiveEndPoint endPoint)
+ throws StreamingException {
+ try {
+ String location;
+ if(endPoint.partitionVals==null || endPoint.partitionVals.isEmpty() ) {
+ location = msClient.getTable(endPoint.database,endPoint.table)
+ .getSd().getLocation();
+ } else {
+ location = msClient.getPartition(endPoint.database, endPoint.table,
+ endPoint.partitionVals).getSd().getLocation();
+ }
+ return new Path(location);
+ } catch (TException e) {
+ throw new StreamingException(e.getMessage()
+ + ". Unable to get path for end point: "
+ + endPoint.partitionVals, e);
+ }
+ }
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+public class ConnectionError extends StreamingException {
+
+ public ConnectionError(String msg, Exception innerEx) {
+ super(msg, innerEx);
+ }
+
+ public ConnectionError(HiveEndPoint endPoint, Exception innerEx) {
+ super("Error connecting to " + endPoint, innerEx);
+ }
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.io.BytesWritable;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Streaming Writer handles delimited input (eg. CSV).
+ * Delimited input is parsed & reordered to match column order in table
+ * Uses Lazy Simple Serde to process delimited input
+ */
+public class DelimitedInputWriter extends AbstractRecordWriter {
+ private final boolean reorderingNeeded;
+ private String delimiter;
+ private char serdeSeparator;
+ private int[] fieldToColMapping;
+ private final ArrayList<String> tableColumns;
+ private AbstractSerDe serde = null;
+
+ static final private Log LOG = LogFactory.getLog(DelimitedInputWriter.class.getName());
+
+ /** Constructor. Uses default separator of the LazySimpleSerde
+ * @param colNamesForFields Column name assignment for input fields. nulls or empty
+ * strings in the array indicates the fields to be skipped
+ * @param delimiter input field delimiter
+ * @param endPoint Hive endpoint
+ * @throws ConnectionError Problem talking to Hive
+ * @throws ClassNotFoundException Serde class not found
+ * @throws SerializationError Serde initialization/interaction failed
+ * @throws StreamingException Problem acquiring file system path for partition
+ * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
+ */
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint)
+ throws ClassNotFoundException, ConnectionError, SerializationError,
+ InvalidColumn, StreamingException {
+ this(colNamesForFields, delimiter, endPoint, null);
+ }
+
+ /** Constructor. Uses default separator of the LazySimpleSerde
+ * @param colNamesForFields Column name assignment for input fields. nulls or empty
+ * strings in the array indicates the fields to be skipped
+ * @param delimiter input field delimiter
+ * @param endPoint Hive endpoint
+ * @param conf a Hive conf object. Can be null if not using advanced hive settings.
+ * @throws ConnectionError Problem talking to Hive
+ * @throws ClassNotFoundException Serde class not found
+ * @throws SerializationError Serde initialization/interaction failed
+ * @throws StreamingException Problem acquiring file system path for partition
+ * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
+ */
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint, HiveConf conf)
+ throws ClassNotFoundException, ConnectionError, SerializationError,
+ InvalidColumn, StreamingException {
+ this(colNamesForFields, delimiter, endPoint, conf,
+ (char) LazySimpleSerDe.DefaultSeparators[0]);
+ }
+
+ /**
+ * Constructor. Allows overriding separator of the LazySimpleSerde
+ * @param colNamesForFields Column name assignment for input fields
+ * @param delimiter input field delimiter
+ * @param endPoint Hive endpoint
+ * @param conf a Hive conf object. Set to null if not using advanced hive settings.
+ * @param serdeSeparator separator used when encoding data that is fed into the
+ * LazySimpleSerde. Ensure this separator does not occur
+ * in the field data
+ * @throws ConnectionError Problem talking to Hive
+ * @throws ClassNotFoundException Serde class not found
+ * @throws SerializationError Serde initialization/interaction failed
+ * @throws StreamingException Problem acquiring file system path for partition
+ * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
+ */
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint, HiveConf conf, char serdeSeparator)
+ throws ClassNotFoundException, ConnectionError, SerializationError,
+ InvalidColumn, StreamingException {
+ super(endPoint, conf);
+ this.tableColumns = getCols(tbl);
+ this.serdeSeparator = serdeSeparator;
+ this.delimiter = delimiter;
+ this.fieldToColMapping = getFieldReordering(colNamesForFields, getTableColumns());
+ this.reorderingNeeded = isReorderingNeeded(delimiter, getTableColumns());
+ LOG.debug("Field reordering needed = " + this.reorderingNeeded + ", for endpoint " + endPoint);
+ this.serdeSeparator = serdeSeparator;
+ }
+
+ private boolean isReorderingNeeded(String delimiter, ArrayList<String> tableColumns) {
+ return !( delimiter.equals(String.valueOf(getSerdeSeparator()))
+ && areFieldsInColOrder(fieldToColMapping)
+ && tableColumns.size()>=fieldToColMapping.length );
+ }
+
+ private static boolean areFieldsInColOrder(int[] fieldToColMapping) {
+ for(int i=0; i<fieldToColMapping.length; ++i) {
+ if(fieldToColMapping[i]!=i) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @VisibleForTesting
+ static int[] getFieldReordering(String[] colNamesForFields, List<String> tableColNames)
+ throws InvalidColumn {
+ int[] result = new int[ colNamesForFields.length ];
+ for(int i=0; i<colNamesForFields.length; ++i) {
+ result[i] = -1;
+ }
+ int i=-1, fieldLabelCount=0;
+ for( String col : colNamesForFields ) {
+ ++i;
+ if(col == null) {
+ continue;
+ }
+ if( col.trim().isEmpty() ) {
+ continue;
+ }
+ ++fieldLabelCount;
+ int loc = tableColNames.indexOf(col);
+ if(loc == -1) {
+ throw new InvalidColumn("Column '" + col + "' not found in table for input field " + i+1);
+ }
+ result[i] = loc;
+ }
+ if(fieldLabelCount>tableColNames.size()) {
+ throw new InvalidColumn("Number of field names exceeds the number of columns in table");
+ }
+ return result;
+ }
+
+ // Reorder fields in record based on the order of columns in the table
+ protected byte[] reorderFields(byte[] record) throws UnsupportedEncodingException {
+ if(!reorderingNeeded) {
+ return record;
+ }
+ String[] reorderedFields = new String[getTableColumns().size()];
+ String decoded = new String(record);
+ String[] fields = decoded.split(delimiter);
+ for (int i=0; i<fieldToColMapping.length; ++i) {
+ int newIndex = fieldToColMapping[i];
+ if(newIndex != -1) {
+ reorderedFields[newIndex] = fields[i];
+ }
+ }
+ return join(reorderedFields,getSerdeSeparator());
+ }
+
+ // handles nulls in items[]
+ // TODO: perhaps can be made more efficient by creating a byte[] directly
+ private static byte[] join(String[] items, char separator) {
+ StringBuffer buff = new StringBuffer(100);
+ if(items.length == 0)
+ return "".getBytes();
+ int i=0;
+ for(; i<items.length-1; ++i) {
+ if(items[i]!=null) {
+ buff.append(items[i]);
+ }
+ buff.append(separator);
+ }
+ if(items[i]!=null) {
+ buff.append(items[i]);
+ }
+ return buff.toString().getBytes();
+ }
+
+ protected ArrayList<String> getTableColumns() {
+ return tableColumns;
+ }
+
+ @Override
+ public void write(long transactionId, byte[] record)
+ throws SerializationError, StreamingIOFailure {
+ try {
+ byte[] orderedFields = reorderFields(record);
+ Object encodedRow = encode(orderedFields);
+ updater.insert(transactionId, encodedRow);
+ } catch (IOException e) {
+ throw new StreamingIOFailure("Error writing record in transaction ("
+ + transactionId + ")", e);
+ }
+ }
+
+ @Override
+ SerDe getSerde() throws SerializationError {
+ if(serde!=null) {
+ return serde;
+ }
+ serde = createSerde(tbl, conf);
+ return serde;
+ }
+
+ private Object encode(byte[] record) throws SerializationError {
+ try {
+ BytesWritable blob = new BytesWritable();
+ blob.set(record, 0, record.length);
+ return serde.deserialize(blob);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to convert byte[] record into Object", e);
+ }
+ }
+
+ /**
+ * Creates LazySimpleSerde
+ * @return
+ * @throws SerializationError if serde could not be initialized
+ * @param tbl
+ */
+ protected LazySimpleSerDe createSerde(Table tbl, HiveConf conf)
+ throws SerializationError {
+ try {
+ Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+ tableProps.setProperty("field.delim", String.valueOf(serdeSeparator));
+ LazySimpleSerDe serde = new LazySimpleSerDe();
+ serde.initialize(conf, tableProps);
+ return serde;
+ } catch (SerDeException e) {
+ throw new SerializationError("Error initializing serde", e);
+ }
+ }
+
+ private ArrayList<String> getCols(Table table) {
+ List<FieldSchema> cols = table.getSd().getCols();
+ ArrayList<String> colNames = new ArrayList<String>(cols.size());
+ for (FieldSchema col : cols) {
+ colNames.add(col.getName().toLowerCase());
+ }
+ return colNames;
+ }
+
+ public char getSerdeSeparator() {
+ return serdeSeparator;
+ }
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HeartBeatFailure.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HeartBeatFailure.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HeartBeatFailure.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HeartBeatFailure.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+import java.util.Collection;
+import java.util.Set;
+
+public class HeartBeatFailure extends StreamingException {
+ private Collection<Long> abortedTxns;
+ private Collection<Long> nosuchTxns;
+
+ public HeartBeatFailure(Collection<Long> abortedTxns, Set<Long> nosuchTxns) {
+ super("Heart beat error. InvalidTxns: " + nosuchTxns + ". AbortedTxns: " + abortedTxns);
+ this.abortedTxns = abortedTxns;
+ this.nosuchTxns = nosuchTxns;
+ }
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,823 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Information about the hive end point (i.e. table or partition) to write to.
+ * A light weight object that does NOT internally hold on to resources such as
+ * network connections. It can be stored in Hashed containers such as sets and hash tables.
+ */
+public class HiveEndPoint {
+ public final String metaStoreUri;
+ public final String database;
+ public final String table;
+ public final ArrayList<String> partitionVals;
+
+
+ static final private Log LOG = LogFactory.getLog(HiveEndPoint.class.getName());
+
+ /**
+ *
+ * @param metaStoreUri URI of the metastore to connect to eg: thrift://localhost:9083
+ * @param database Name of the Hive database
+ * @param table Name of table to stream to
+ * @param partitionVals Indicates the specific partition to stream to. Can be null or empty List
+ * if streaming to a table without partitions. The order of values in this
+ * list must correspond exactly to the order of partition columns specified
+ * during the table creation. E.g. For a table partitioned by
+ * (continent string, country string), partitionVals could be the list
+ * ("Asia", "India").
+ */
+ public HiveEndPoint(String metaStoreUri
+ , String database, String table, List<String> partitionVals) {
+ this.metaStoreUri = metaStoreUri;
+ if (database==null) {
+ throw new IllegalArgumentException("Database cannot be null for HiveEndPoint");
+ }
+ this.database = database;
+ this.table = table;
+ if (table==null) {
+ throw new IllegalArgumentException("Table cannot be null for HiveEndPoint");
+ }
+ this.partitionVals = partitionVals==null ? new ArrayList<String>()
+ : new ArrayList<String>( partitionVals );
+ }
+
+
+ /**
+ * Acquire a new connection to MetaStore for streaming
+ * @param createPartIfNotExists If true, the partition specified in the endpoint
+ * will be auto created if it does not exist
+ * @return
+ * @throws ConnectionError if problem connecting
+ * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false)
+ * @throws ImpersonationFailed if not able to impersonate 'proxyUser'
+ * @throws IOException if there was an I/O error when acquiring connection
+ * @throws PartitionCreationFailed if failed to create partition
+ * @throws InterruptedException
+ */
+ public StreamingConnection newConnection(final boolean createPartIfNotExists)
+ throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
+ , ImpersonationFailed , InterruptedException {
+ return newConnection(null, createPartIfNotExists, null);
+ }
+
+ /**
+ * Acquire a new connection to MetaStore for streaming
+ * @param createPartIfNotExists If true, the partition specified in the endpoint
+ * will be auto created if it does not exist
+ * @param conf HiveConf object, set it to null if not using advanced hive settings.
+ * @return
+ * @throws ConnectionError if problem connecting
+ * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false)
+ * @throws ImpersonationFailed if not able to impersonate 'proxyUser'
+ * @throws IOException if there was an I/O error when acquiring connection
+ * @throws PartitionCreationFailed if failed to create partition
+ * @throws InterruptedException
+ */
+ public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf)
+ throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
+ , ImpersonationFailed , InterruptedException {
+ return newConnection(null, createPartIfNotExists, conf);
+ }
+
+ /**
+ * Acquire a new connection to MetaStore for streaming
+ * @param proxyUser User on whose behalf all hdfs and hive operations will be
+ * performed on this connection. Set it to null or empty string
+ * to connect as user of current process without impersonation.
+ * Currently this argument is not supported and must be null
+ * @param createPartIfNotExists If true, the partition specified in the endpoint
+ * will be auto created if it does not exist
+ * @return
+ * @throws ConnectionError if problem connecting
+ * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false)
+ * @throws ImpersonationFailed if not able to impersonate 'proxyUser'
+ * @throws IOException if there was an I/O error when acquiring connection
+ * @throws PartitionCreationFailed if failed to create partition
+ * @throws InterruptedException
+ */
+ private StreamingConnection newConnection(final String proxyUser,
+ final boolean createPartIfNotExists, final HiveConf conf)
+ throws ConnectionError, InvalidPartition,
+ InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException {
+ if (proxyUser ==null || proxyUser.trim().isEmpty() ) {
+ return newConnectionImpl(System.getProperty("user.name"), null, createPartIfNotExists, conf);
+ }
+ final UserGroupInformation ugi = getUserGroupInfo(proxyUser);
+ try {
+ return ugi.doAs (
+ new PrivilegedExceptionAction<StreamingConnection>() {
+ @Override
+ public StreamingConnection run()
+ throws ConnectionError, InvalidPartition, InvalidTable
+ , PartitionCreationFailed {
+ return newConnectionImpl(proxyUser, ugi, createPartIfNotExists, conf);
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed to impersonate '" + proxyUser +
+ "' when acquiring connection", e);
+ }
+ }
+
+
+
+ private StreamingConnection newConnectionImpl(String proxyUser, UserGroupInformation ugi,
+ boolean createPartIfNotExists, HiveConf conf)
+ throws ConnectionError, InvalidPartition, InvalidTable
+ , PartitionCreationFailed {
+ return new ConnectionImpl(this, proxyUser, ugi, conf, createPartIfNotExists);
+ }
+
+ private static UserGroupInformation getUserGroupInfo(String proxyUser)
+ throws ImpersonationFailed {
+ try {
+ return UserGroupInformation.createProxyUser(
+ proxyUser, UserGroupInformation.getLoginUser());
+ } catch (IOException e) {
+ LOG.error("Unable to login as proxy user. Exception follows.", e);
+ throw new ImpersonationFailed(proxyUser,e);
+ }
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ HiveEndPoint endPoint = (HiveEndPoint) o;
+
+ if (database != null
+ ? !database.equals(endPoint.database)
+ : endPoint.database != null ) {
+ return false;
+ }
+ if (metaStoreUri != null
+ ? !metaStoreUri.equals(endPoint.metaStoreUri)
+ : endPoint.metaStoreUri != null ) {
+ return false;
+ }
+ if (!partitionVals.equals(endPoint.partitionVals)) {
+ return false;
+ }
+ if (table != null ? !table.equals(endPoint.table) : endPoint.table != null) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = metaStoreUri != null ? metaStoreUri.hashCode() : 0;
+ result = 31 * result + (database != null ? database.hashCode() : 0);
+ result = 31 * result + (table != null ? table.hashCode() : 0);
+ result = 31 * result + partitionVals.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "{" +
+ "metaStoreUri='" + metaStoreUri + '\'' +
+ ", database='" + database + '\'' +
+ ", table='" + table + '\'' +
+ ", partitionVals=" + partitionVals + " }";
+ }
+
+
+ private static class ConnectionImpl implements StreamingConnection {
+ private final IMetaStoreClient msClient;
+ private final HiveEndPoint endPt;
+ private final String proxyUser;
+ private final UserGroupInformation ugi;
+
+ /**
+ *
+ * @param endPoint end point to connect to
+ * @param proxyUser can be null
+ * @param ugi of prody user. If ugi is null, impersonation of proxy user will be disabled
+ * @param conf HiveConf object
+ * @param createPart create the partition if it does not exist
+ * @throws ConnectionError if there is trouble connecting
+ * @throws InvalidPartition if specified partition does not exist (and createPart=false)
+ * @throws InvalidTable if specified table does not exist
+ * @throws PartitionCreationFailed if createPart=true and not able to create partition
+ */
+ private ConnectionImpl(HiveEndPoint endPoint, String proxyUser, UserGroupInformation ugi,
+ HiveConf conf, boolean createPart)
+ throws ConnectionError, InvalidPartition, InvalidTable
+ , PartitionCreationFailed {
+ this.proxyUser = proxyUser;
+ this.endPt = endPoint;
+ this.ugi = ugi;
+ if (conf==null) {
+ conf = HiveEndPoint.createHiveConf(this.getClass(),endPoint.metaStoreUri);
+ }
+ this.msClient = getMetaStoreClient(endPoint, conf);
+ if (createPart && !endPoint.partitionVals.isEmpty()) {
+ createPartitionIfNotExists(endPoint, msClient, conf);
+ }
+ }
+
+ /**
+ * Close connection
+ */
+ @Override
+ public void close() {
+ if (ugi==null) {
+ msClient.close();
+ return;
+ }
+ try {
+ ugi.doAs (
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ msClient.close();
+ return null;
+ }
+ } );
+ } catch (IOException e) {
+ LOG.error("Error closing connection to " + endPt, e);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted when closing connection to " + endPt, e);
+ }
+ }
+
+
+ /**
+ * Acquires a new batch of transactions from Hive.
+ *
+ * @param numTransactions is a hint from client indicating how many transactions client needs.
+ * @param recordWriter Used to write record. The same writer instance can
+ * be shared with another TransactionBatch (to the same endpoint)
+ * only after the first TransactionBatch has been closed.
+ * Writer will be closed when the TransactionBatch is closed.
+ * @return
+ * @throws StreamingIOFailure if failed to create new RecordUpdater for batch
+ * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
+ * @throws ImpersonationFailed failed to run command as proxyUser
+ * @throws InterruptedException
+ */
+ public TransactionBatch fetchTransactionBatch(final int numTransactions,
+ final RecordWriter recordWriter)
+ throws StreamingException, TransactionBatchUnAvailable, ImpersonationFailed
+ , InterruptedException {
+ if (ugi==null) {
+ return fetchTransactionBatchImpl(numTransactions, recordWriter);
+ }
+ try {
+ return ugi.doAs (
+ new PrivilegedExceptionAction<TransactionBatch>() {
+ @Override
+ public TransactionBatch run() throws StreamingException {
+ return fetchTransactionBatchImpl(numTransactions, recordWriter);
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
+ "' when acquiring Transaction Batch on endPoint " + endPt, e);
+ }
+ }
+
+ private TransactionBatch fetchTransactionBatchImpl(int numTransactions,
+ RecordWriter recordWriter)
+ throws StreamingException, TransactionBatchUnAvailable {
+ return new TransactionBatchImpl(proxyUser, ugi, endPt, numTransactions, msClient
+ , recordWriter);
+ }
+
+
+ private static void createPartitionIfNotExists(HiveEndPoint ep,
+ IMetaStoreClient msClient, HiveConf conf)
+ throws InvalidTable, PartitionCreationFailed {
+ if (ep.partitionVals.isEmpty()) {
+ return;
+ }
+ SessionState state = SessionState.start(new CliSessionState(conf));
+ Driver driver = new Driver(conf);
+
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Attempting to create partition (if not existent) " + ep);
+ }
+
+ List<FieldSchema> partKeys = msClient.getTable(ep.database, ep.table)
+ .getPartitionKeys();
+ runDDL(driver, "use " + ep.database);
+ String query = "alter table " + ep.table + " add if not exists partition "
+ + partSpecStr(partKeys, ep.partitionVals);
+ runDDL(driver, query);
+ } catch (MetaException e) {
+ LOG.error("Failed to create partition : " + ep, e);
+ throw new PartitionCreationFailed(ep, e);
+ } catch (NoSuchObjectException e) {
+ LOG.error("Failed to create partition : " + ep, e);
+ throw new InvalidTable(ep.database, ep.table);
+ } catch (TException e) {
+ LOG.error("Failed to create partition : " + ep, e);
+ throw new PartitionCreationFailed(ep, e);
+ } catch (QueryFailedException e) {
+ LOG.error("Failed to create partition : " + ep, e);
+ throw new PartitionCreationFailed(ep, e);
+ } finally {
+ driver.close();
+ try {
+ state.close();
+ } catch (IOException e) {
+ LOG.warn("Error closing SessionState used to run Hive DDL.");
+ }
+ }
+ }
+
+ private static boolean runDDL(Driver driver, String sql) throws QueryFailedException {
+ int retryCount = 1; // # of times to retry if first attempt fails
+ for (int attempt=0; attempt<=retryCount; ++attempt) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Running Hive Query: "+ sql);
+ }
+ driver.run(sql);
+ return true;
+ } catch (CommandNeedRetryException e) {
+ if (attempt==retryCount) {
+ throw new QueryFailedException(sql, e);
+ }
+ continue;
+ }
+ } // for
+ return false;
+ }
+
+ private static String partSpecStr(List<FieldSchema> partKeys, ArrayList<String> partVals) {
+ if (partKeys.size()!=partVals.size()) {
+ throw new IllegalArgumentException("Partition values:" + partVals +
+ ", does not match the partition Keys in table :" + partKeys );
+ }
+ StringBuffer buff = new StringBuffer(partKeys.size()*20);
+ buff.append(" ( ");
+ int i=0;
+ for (FieldSchema schema : partKeys) {
+ buff.append(schema.getName());
+ buff.append("='");
+ buff.append(partVals.get(i));
+ buff.append("'");
+ if (i!=partKeys.size()-1) {
+ buff.append(",");
+ }
+ ++i;
+ }
+ buff.append(" )");
+ return buff.toString();
+ }
+
+ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveConf conf)
+ throws ConnectionError {
+
+ if (endPoint.metaStoreUri!= null) {
+ conf.setVar(HiveConf.ConfVars.METASTOREURIS, endPoint.metaStoreUri);
+ }
+
+ try {
+ return new HiveMetaStoreClient(conf);
+ } catch (MetaException e) {
+ throw new ConnectionError("Error connecting to Hive Metastore URI: "
+ + endPoint.metaStoreUri, e);
+ }
+ }
+
+
+ } // class ConnectionImpl
+
+ private static class TransactionBatchImpl implements TransactionBatch {
+ private final String proxyUser;
+ private final UserGroupInformation ugi;
+ private final HiveEndPoint endPt;
+ private final IMetaStoreClient msClient;
+ private final RecordWriter recordWriter;
+ private final List<Long> txnIds;
+
+ private int currentTxnIndex;
+ private final String partNameForLock;
+
+ private TxnState state;
+ private LockRequest lockRequest = null;
+
+ /**
+ * Represents a batch of transactions acquired from MetaStore
+ *
+ * @param proxyUser
+ * @param ugi
+ * @param endPt
+ * @param numTxns
+ * @param msClient
+ * @param recordWriter
+ * @throws StreamingException if failed to create new RecordUpdater for batch
+ * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
+ */
+ private TransactionBatchImpl(String proxyUser, UserGroupInformation ugi, HiveEndPoint endPt
+ , int numTxns, IMetaStoreClient msClient, RecordWriter recordWriter)
+ throws StreamingException, TransactionBatchUnAvailable {
+ try {
+ if ( endPt.partitionVals!=null && !endPt.partitionVals.isEmpty() ) {
+ Table tableObj = msClient.getTable(endPt.database, endPt.table);
+ List<FieldSchema> partKeys = tableObj.getPartitionKeys();
+ partNameForLock = Warehouse.makePartName(partKeys, endPt.partitionVals);
+ } else {
+ partNameForLock = null;
+ }
+ this.proxyUser = proxyUser;
+ this.ugi = ugi;
+ this.endPt = endPt;
+ this.msClient = msClient;
+ this.recordWriter = recordWriter;
+ this.txnIds = msClient.openTxns(proxyUser, numTxns).getTxn_ids();
+ this.currentTxnIndex = -1;
+ this.state = TxnState.INACTIVE;
+ recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1));
+ } catch (TException e) {
+ throw new TransactionBatchUnAvailable(endPt, e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ if (txnIds==null || txnIds.isEmpty()) {
+ return "{}";
+ }
+ return "TxnIds=[" + txnIds.get(0) + "src/gen/thrift" + txnIds.get(txnIds.size()-1)
+ + "] on endPoint= " + endPt;
+ }
+
+ /**
+ * Activate the next available transaction in the current transaction batch
+ * @throws TransactionError failed to switch to next transaction
+ */
+ @Override
+ public void beginNextTransaction() throws TransactionError, ImpersonationFailed,
+ InterruptedException {
+ if (ugi==null) {
+ beginNextTransactionImpl();
+ return;
+ }
+ try {
+ ugi.doAs (
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws TransactionError {
+ beginNextTransactionImpl();
+ return null;
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed impersonating proxyUser '" + proxyUser +
+ "' when switch to next Transaction for endPoint :" + endPt, e);
+ }
+ }
+
+ private void beginNextTransactionImpl() throws TransactionError {
+ if ( currentTxnIndex >= txnIds.size() )
+ throw new InvalidTrasactionState("No more transactions available in" +
+ " current batch for end point : " + endPt);
+ ++currentTxnIndex;
+ lockRequest = createLockRequest(endPt, partNameForLock, proxyUser, getCurrentTxnId());
+ try {
+ LockResponse res = msClient.lock(lockRequest);
+ if (res.getState() != LockState.ACQUIRED) {
+ throw new TransactionError("Unable to acquire lock on " + endPt);
+ }
+ } catch (TException e) {
+ throw new TransactionError("Unable to acquire lock on " + endPt, e);
+ }
+
+ state = TxnState.OPEN;
+ }
+
+ /**
+ * Get Id of currently open transaction
+ * @return
+ */
+ @Override
+ public Long getCurrentTxnId() {
+ return txnIds.get(currentTxnIndex);
+ }
+
+ /**
+ * get state of current tramsaction
+ * @return
+ */
+ @Override
+ public TxnState getCurrentTransactionState() {
+ return state;
+ }
+
+ /**
+ * Remaining transactions are the ones that are not committed or aborted or active.
+ * Active transaction is not considered part of remaining txns.
+ * @return number of transactions remaining this batch.
+ */
+ @Override
+ public int remainingTransactions() {
+ if (currentTxnIndex>=0) {
+ return txnIds.size() - currentTxnIndex -1;
+ }
+ return txnIds.size();
+ }
+
+
+ /**
+ * Write record using RecordWriter
+ * @param record the data to be written
+ * @throws StreamingIOFailure I/O failure
+ * @throws SerializationError serialization error
+ * @throws ImpersonationFailed error writing on behalf of proxyUser
+ * @throws InterruptedException
+ */
+ @Override
+ public void write(final byte[] record)
+ throws StreamingException, InterruptedException,
+ ImpersonationFailed {
+ if (ugi==null) {
+ recordWriter.write(getCurrentTxnId(), record);
+ return;
+ }
+ try {
+ ugi.doAs (
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws StreamingException {
+ recordWriter.write(getCurrentTxnId(), record);
+ return null;
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
+ "' when writing to endPoint :" + endPt + ". Transaction Id: "
+ + getCurrentTxnId(), e);
+ }
+ }
+
+
+ /**
+ * Write records using RecordWriter
+ * @param records collection of rows to be written
+ * @throws StreamingException serialization error
+ * @throws ImpersonationFailed error writing on behalf of proxyUser
+ * @throws InterruptedException
+ */
+ @Override
+ public void write(final Collection<byte[]> records)
+ throws StreamingException, InterruptedException,
+ ImpersonationFailed {
+ if (ugi==null) {
+ writeImpl(records);
+ return;
+ }
+ try {
+ ugi.doAs (
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws StreamingException {
+ writeImpl(records);
+ return null;
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed impersonating proxyUser '" + proxyUser +
+ "' when writing to endPoint :" + endPt + ". Transaction Id: "
+ + getCurrentTxnId(), e);
+ }
+ }
+
+ private void writeImpl(Collection<byte[]> records)
+ throws StreamingException {
+ for (byte[] record : records) {
+ recordWriter.write(getCurrentTxnId(), record);
+ }
+ }
+
+
+ /**
+ * Commit the currently open transaction
+ * @throws TransactionError
+ * @throws StreamingIOFailure if flushing records failed
+ * @throws ImpersonationFailed if
+ * @throws InterruptedException
+ */
+ @Override
+ public void commit() throws TransactionError, StreamingException,
+ ImpersonationFailed, InterruptedException {
+ if (ugi==null) {
+ commitImpl();
+ return;
+ }
+ try {
+ ugi.doAs (
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws StreamingException {
+ commitImpl();
+ return null;
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
+ "' when committing Txn on endPoint :" + endPt + ". Transaction Id: "
+ + getCurrentTxnId(), e);
+ }
+
+ }
+
+ private void commitImpl() throws TransactionError, StreamingException {
+ try {
+ recordWriter.flush();
+ msClient.commitTxn(txnIds.get(currentTxnIndex));
+ state = TxnState.COMMITTED;
+ } catch (NoSuchTxnException e) {
+ throw new TransactionError("Invalid transaction id : "
+ + getCurrentTxnId(), e);
+ } catch (TxnAbortedException e) {
+ throw new TransactionError("Aborted transaction cannot be committed"
+ , e);
+ } catch (TException e) {
+ throw new TransactionError("Unable to commit transaction"
+ + getCurrentTxnId(), e);
+ }
+ }
+
+ /**
+ * Abort the currently open transaction
+ * @throws TransactionError
+ */
+ @Override
+ public void abort() throws TransactionError, StreamingException
+ , ImpersonationFailed, InterruptedException {
+ if (ugi==null) {
+ abortImpl();
+ return;
+ }
+ try {
+ ugi.doAs (
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws StreamingException {
+ abortImpl();
+ return null;
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
+ "' when aborting Txn on endPoint :" + endPt + ". Transaction Id: "
+ + getCurrentTxnId(), e);
+ }
+ }
+
+ private void abortImpl() throws TransactionError, StreamingException {
+ try {
+ recordWriter.clear();
+ msClient.rollbackTxn(getCurrentTxnId());
+ state = TxnState.ABORTED;
+ } catch (NoSuchTxnException e) {
+ throw new TransactionError("Unable to abort invalid transaction id : "
+ + getCurrentTxnId(), e);
+ } catch (TException e) {
+ throw new TransactionError("Unable to abort transaction id : "
+ + getCurrentTxnId(), e);
+ }
+ }
+
+ @Override
+ public void heartbeat() throws StreamingException, HeartBeatFailure {
+ Long first = txnIds.get(currentTxnIndex);
+ Long last = txnIds.get(txnIds.size()-1);
+ try {
+ HeartbeatTxnRangeResponse resp = msClient.heartbeatTxnRange(first, last);
+ if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {
+ throw new HeartBeatFailure(resp.getAborted(), resp.getNosuch());
+ }
+ } catch (TException e) {
+ throw new StreamingException("Failure to heartbeat on ids (" + first + "src/gen/thrift"
+ + last + ") on end point : " + endPt );
+ }
+ }
+
+ /**
+ * Close the TransactionBatch
+ * @throws StreamingIOFailure I/O failure when closing transaction batch
+ */
+ @Override
+ public void close() throws StreamingException, ImpersonationFailed, InterruptedException {
+ if (ugi==null) {
+ state = TxnState.INACTIVE;
+ recordWriter.closeBatch();
+ return;
+ }
+ try {
+ ugi.doAs (
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws StreamingException {
+ state = TxnState.INACTIVE;
+ recordWriter.closeBatch();
+ return null;
+ }
+ }
+ );
+ } catch (IOException e) {
+ throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
+ "' when closing Txn Batch on endPoint :" + endPt, e);
+ }
+ }
+
+ private static LockRequest createLockRequest(final HiveEndPoint hiveEndPoint,
+ String partNameForLock, String user, long txnId) {
+ LockRequestBuilder rqstBuilder = new LockRequestBuilder();
+ rqstBuilder.setUser(user);
+ rqstBuilder.setTransactionId(txnId);
+
+ LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
+ .setDbName(hiveEndPoint.database)
+ .setTableName(hiveEndPoint.table)
+ .setShared();
+ if (partNameForLock!=null && !partNameForLock.isEmpty() ) {
+ lockCompBuilder.setPartitionName(partNameForLock);
+ }
+ rqstBuilder.addLockComponent(lockCompBuilder.build());
+
+ return rqstBuilder.build();
+ }
+ } // class TransactionBatchImpl
+
+ static HiveConf createHiveConf(Class<?> clazz, String metaStoreUri) {
+ HiveConf conf = new HiveConf(clazz);
+ conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER,
+ "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+ conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
+ if (metaStoreUri!= null) {
+ conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
+ }
+ return conf;
+ }
+
+} // class HiveEndPoint
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ImpersonationFailed.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ImpersonationFailed.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ImpersonationFailed.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ImpersonationFailed.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+public class ImpersonationFailed extends StreamingException {
+ public ImpersonationFailed(String username, Exception e) {
+ super("Failed to impersonate user " + username, e);
+ }
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidColumn.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidColumn.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidColumn.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidColumn.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+public class InvalidColumn extends StreamingException {
+
+ public InvalidColumn(String msg) {
+ super(msg);
+ }
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidPartition.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidPartition.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidPartition.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidPartition.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+public class InvalidPartition extends StreamingException {
+
+ public InvalidPartition(String partitionName, String partitionValue) {
+ super("Invalid partition: Name=" + partitionName +
+ ", Value=" + partitionValue);
+ }
+
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+public class InvalidTable extends StreamingException {
+
+ private static String makeMsg(String db, String table) {
+ return "Invalid table db:" + db + ", table:" + table;
+ }
+
+ public InvalidTable(String db, String table) {
+ super(makeMsg(db,table), null);
+ }
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTrasactionState.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTrasactionState.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTrasactionState.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTrasactionState.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+public class InvalidTrasactionState extends TransactionError {
+ public InvalidTrasactionState(String msg) {
+ super(msg);
+ }
+
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/PartitionCreationFailed.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/PartitionCreationFailed.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/PartitionCreationFailed.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/PartitionCreationFailed.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+public class PartitionCreationFailed extends StreamingException {
+ public PartitionCreationFailed(HiveEndPoint endPoint, Exception cause) {
+ super("Failed to create partition " + endPoint, cause);
+ }
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/QueryFailedException.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/QueryFailedException.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/QueryFailedException.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/QueryFailedException.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+
+public class QueryFailedException extends StreamingException {
+ String query;
+ public QueryFailedException(String query, CommandNeedRetryException e) {
+ super("Query failed: " + query + ". Due to :" + e.getMessage(), e);
+ this.query = query;
+ }
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+
+public interface RecordWriter {
+
+ /** Writes using a hive RecordUpdater
+ *
+ * @param transactionId the ID of the Txn in which the write occurs
+ * @param record the record to be written
+ */
+ public void write(long transactionId, byte[] record) throws StreamingException;
+
+ /** Flush records from buffer. Invoked by TransactionBatch.commit() */
+ public void flush() throws StreamingException;
+
+ /** Clear bufferred writes. Invoked by TransactionBatch.abort() */
+ public void clear() throws StreamingException;
+
+ /** Acquire a new RecordUpdater. Invoked when
+ * StreamingConnection.fetchTransactionBatch() is called */
+ public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingException;
+
+ /** Close the RecordUpdater. Invoked by TransactionBatch.close() */
+ public void closeBatch() throws StreamingException;
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/SerializationError.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/SerializationError.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/SerializationError.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/SerializationError.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+
+public class SerializationError extends StreamingException {
+ public SerializationError(String msg, Exception e) {
+ super(msg,e);
+ }
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+/**
+ * Represents a connection to a HiveEndPoint. Used to acquire transaction batches.
+ */
+public interface StreamingConnection {
+
+ /**
+ * Acquires a new batch of transactions from Hive.
+
+ * @param numTransactionsHint is a hint from client indicating how many transactions client needs.
+ * @param writer Used to write record. The same writer instance can
+ * be shared with another TransactionBatch (to the same endpoint)
+ * only after the first TransactionBatch has been closed.
+ * Writer will be closed when the TransactionBatch is closed.
+ * @return
+ * @throws ConnectionError
+ * @throws InvalidPartition
+ * @throws StreamingException
+ * @return a batch of transactions
+ */
+ public TransactionBatch fetchTransactionBatch(int numTransactionsHint,
+ RecordWriter writer)
+ throws ConnectionError, StreamingException, InterruptedException;
+
+ /**
+ * Close connection
+ */
+ public void close();
+
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingException.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingException.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingException.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingException.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+public class StreamingException extends Exception {
+ public StreamingException(String msg, Exception cause) {
+ super(msg, cause);
+ }
+ public StreamingException(String msg) {
+ super(msg);
+ }
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingIOFailure.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingIOFailure.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingIOFailure.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingIOFailure.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+
+public class StreamingIOFailure extends StreamingException {
+
+ public StreamingIOFailure(String msg, Exception cause) {
+ super(msg, cause);
+ }
+
+ public StreamingIOFailure(String msg) {
+ super(msg);
+ }
+}