You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2014/04/10 03:09:01 UTC

svn commit: r1586190 [1/2] - in /hive/branches/branch-0.13: hcatalog/ hcatalog/streaming/ hcatalog/streaming/src/ hcatalog/streaming/src/java/ hcatalog/streaming/src/java/org/ hcatalog/streaming/src/java/org/apache/ hcatalog/streaming/src/java/org/apac...

Author: gates
Date: Thu Apr 10 01:08:59 2014
New Revision: 1586190

URL: http://svn.apache.org/r1586190
Log:
HIVE-5687 Streaming support in Hive (Roshan Naik via gates)

Added:
    hive/branches/branch-0.13/hcatalog/streaming/
    hive/branches/branch-0.13/hcatalog/streaming/pom.xml
    hive/branches/branch-0.13/hcatalog/streaming/src/
    hive/branches/branch-0.13/hcatalog/streaming/src/java/
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HeartBeatFailure.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ImpersonationFailed.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidColumn.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidPartition.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTrasactionState.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/PartitionCreationFailed.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/QueryFailedException.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/SerializationError.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingException.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingIOFailure.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatchUnAvailable.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java
    hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
    hive/branches/branch-0.13/hcatalog/streaming/src/test/
    hive/branches/branch-0.13/hcatalog/streaming/src/test/org/
    hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/
    hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/
    hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/
    hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/
    hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java
    hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestDelimitedInputWriter.java
    hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
    hive/branches/branch-0.13/hcatalog/streaming/src/test/sit
Modified:
    hive/branches/branch-0.13/hcatalog/pom.xml
    hive/branches/branch-0.13/packaging/pom.xml
    hive/branches/branch-0.13/packaging/src/main/assembly/bin.xml

Modified: hive/branches/branch-0.13/hcatalog/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/pom.xml?rev=1586190&r1=1586189&r2=1586190&view=diff
==============================================================================
--- hive/branches/branch-0.13/hcatalog/pom.xml (original)
+++ hive/branches/branch-0.13/hcatalog/pom.xml Thu Apr 10 01:08:59 2014
@@ -44,6 +44,7 @@
     <module>webhcat/java-client</module>
     <module>webhcat/svr</module>
     <module>storage-handlers/hbase</module>
+    <module>streaming</module>
   </modules>
 
   <profiles>

Added: hive/branches/branch-0.13/hcatalog/streaming/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/pom.xml?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/pom.xml (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/pom.xml Thu Apr 10 01:08:59 2014
@@ -0,0 +1,124 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hive.hcatalog</groupId>
+    <artifactId>hive-hcatalog</artifactId>
+    <version>0.14.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>hive-hcatalog-streaming</artifactId>
+  <packaging>jar</packaging>
+  <name>Hive HCatalog Streaming</name>
+
+  <properties>
+    <hive.path.to.root>../..</hive.path.to.root>
+  </properties>
+
+  <profiles>
+  <profile>
+    <id>hadoop-1</id>
+    <dependencies>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-core</artifactId>
+        <optional>true</optional>
+      </dependency>
+    </dependencies>
+  </profile>
+  <profile>
+    <id>hadoop-2</id>
+    <dependencies>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-common</artifactId>
+        <optional>true</optional>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-mapreduce-client-core</artifactId>
+        <optional>true</optional>
+      </dependency>
+    </dependencies>
+  </profile>
+  </profiles>
+
+  <dependencies>
+    <!-- dependencies are always listed in sorted order by groupId, artifectId -->
+    <!-- intra-project -->
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-serde</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-metastore</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-cli</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-core</artifactId>
+      <optional>true</optional>
+      <version>${project.version}</version>
+    </dependency>
+
+    <!-- test -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <sourceDirectory>${basedir}/src/java</sourceDirectory>
+    <testSourceDirectory>${basedir}/src/test</testSourceDirectory>
+    <resources>
+    </resources>
+    <plugins>
+      <!-- plugins are always listed in sorted order by groupId, artifectId -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+
+import java.util.Random;
+
+abstract class AbstractRecordWriter implements RecordWriter {
+  static final private Log LOG = LogFactory.getLog(AbstractRecordWriter.class.getName());
+
+  final HiveConf conf;
+  final HiveEndPoint endPoint;
+  final Table tbl;
+
+  final HiveMetaStoreClient msClient;
+  RecordUpdater updater = null;
+
+  private final int totalBuckets;
+  private Random rand = new Random();
+  private int currentBucketId = 0;
+  private final Path partitionPath;
+
+  final AcidOutputFormat<?> outf;
+
+  protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf)
+          throws ConnectionError, StreamingException {
+    this.endPoint = endPoint;
+    this.conf = conf!=null ? conf
+                : HiveEndPoint.createHiveConf(DelimitedInputWriter.class, endPoint.metaStoreUri);
+    try {
+      msClient = new HiveMetaStoreClient(conf);
+      this.tbl = msClient.getTable(endPoint.database, endPoint.table);
+      this.partitionPath = getPathForEndPoint(msClient, endPoint);
+      this.totalBuckets = tbl.getSd().getNumBuckets();
+      if(totalBuckets <= 0) {
+        throw new StreamingException("Cannot stream to table that has not been bucketed : "
+                + endPoint);
+      }
+      String outFormatName = this.tbl.getSd().getOutputFormat();
+      outf = (AcidOutputFormat<?>) ReflectionUtils.newInstance(Class.forName(outFormatName), conf);
+    } catch (MetaException e) {
+      throw new ConnectionError(endPoint, e);
+    } catch (NoSuchObjectException e) {
+      throw new ConnectionError(endPoint, e);
+    } catch (TException e) {
+      throw new StreamingException(e.getMessage(), e);
+    } catch (ClassNotFoundException e) {
+      throw new StreamingException(e.getMessage(), e);
+    }
+  }
+
+  protected AbstractRecordWriter(HiveEndPoint endPoint)
+          throws ConnectionError, StreamingException {
+    this(endPoint, HiveEndPoint.createHiveConf(AbstractRecordWriter.class, endPoint.metaStoreUri) );
+  }
+
+  abstract SerDe getSerde() throws SerializationError;
+
+  @Override
+  public void flush() throws StreamingIOFailure {
+    try {
+      updater.flush();
+    } catch (IOException e) {
+      throw new StreamingIOFailure("Unable to flush recordUpdater", e);
+    }
+  }
+
+  @Override
+  public void clear() throws StreamingIOFailure {
+  }
+
+  /**
+   * Creates a new record updater for the new batch
+   * @param minTxnId smallest Txnid in the batch
+   * @param maxTxnID largest Txnid in the batch
+   * @throws StreamingIOFailure if failed to create record updater
+   */
+  @Override
+  public void newBatch(Long minTxnId, Long maxTxnID)
+          throws StreamingIOFailure, SerializationError {
+    try {
+      this.currentBucketId = rand.nextInt(totalBuckets);
+      LOG.debug("Creating Record updater");
+      updater = createRecordUpdater(currentBucketId, minTxnId, maxTxnID);
+    } catch (IOException e) {
+      LOG.error("Failed creating record updater", e);
+      throw new StreamingIOFailure("Unable to get new record Updater", e);
+    }
+  }
+
+  @Override
+  public void closeBatch() throws StreamingIOFailure {
+    try {
+      updater.close(false);
+      updater = null;
+    } catch (IOException e) {
+      throw new StreamingIOFailure("Unable to close recordUpdater", e);
+    }
+  }
+
+  private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID)
+          throws IOException, SerializationError {
+    try {
+      return  outf.getRecordUpdater(partitionPath,
+              new AcidOutputFormat.Options(conf)
+                      .inspector(getSerde().getObjectInspector())
+                      .bucket(bucketId)
+                      .minimumTransactionId(minTxnId)
+                      .maximumTransactionId(maxTxnID));
+    } catch (SerDeException e) {
+      throw new SerializationError("Failed to get object inspector from Serde "
+              + getSerde().getClass().getName(), e);
+    }
+  }
+
+  private Path getPathForEndPoint(HiveMetaStoreClient msClient, HiveEndPoint endPoint)
+          throws StreamingException {
+    try {
+      String location;
+      if(endPoint.partitionVals==null || endPoint.partitionVals.isEmpty() ) {
+        location = msClient.getTable(endPoint.database,endPoint.table)
+                .getSd().getLocation();
+      } else {
+        location = msClient.getPartition(endPoint.database, endPoint.table,
+                endPoint.partitionVals).getSd().getLocation();
+      }
+      return new Path(location);
+    } catch (TException e) {
+      throw new StreamingException(e.getMessage()
+              + ". Unable to get path for end point: "
+              + endPoint.partitionVals, e);
+    }
+  }
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+public class ConnectionError extends StreamingException {
+
+  public ConnectionError(String msg, Exception innerEx) {
+    super(msg, innerEx);
+  }
+
+  public ConnectionError(HiveEndPoint endPoint, Exception innerEx) {
+    super("Error connecting to " + endPoint, innerEx);
+  }
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.io.BytesWritable;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Streaming Writer handles delimited input (eg. CSV).
+ * Delimited input is parsed & reordered to match column order in table
+ * Uses Lazy Simple Serde to process delimited input
+ */
+public class DelimitedInputWriter extends AbstractRecordWriter {
+  private final boolean reorderingNeeded;
+  private String delimiter;
+  private char serdeSeparator;
+  private int[] fieldToColMapping;
+  private final ArrayList<String> tableColumns;
+  private AbstractSerDe serde = null;
+
+  static final private Log LOG = LogFactory.getLog(DelimitedInputWriter.class.getName());
+
+  /** Constructor. Uses default separator of the LazySimpleSerde
+   * @param colNamesForFields Column name assignment for input fields. nulls or empty
+   *                          strings in the array indicates the fields to be skipped
+   * @param delimiter input field delimiter
+   * @param endPoint Hive endpoint
+   * @throws ConnectionError Problem talking to Hive
+   * @throws ClassNotFoundException Serde class not found
+   * @throws SerializationError Serde initialization/interaction failed
+   * @throws StreamingException Problem acquiring file system path for partition
+   * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
+   */
+  public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint)
+          throws ClassNotFoundException, ConnectionError, SerializationError,
+                 InvalidColumn, StreamingException {
+    this(colNamesForFields, delimiter, endPoint, null);
+  }
+
+ /** Constructor. Uses default separator of the LazySimpleSerde
+  * @param colNamesForFields Column name assignment for input fields. nulls or empty
+  *                          strings in the array indicates the fields to be skipped
+  * @param delimiter input field delimiter
+  * @param endPoint Hive endpoint
+  * @param conf a Hive conf object. Can be null if not using advanced hive settings.
+  * @throws ConnectionError Problem talking to Hive
+  * @throws ClassNotFoundException Serde class not found
+  * @throws SerializationError Serde initialization/interaction failed
+  * @throws StreamingException Problem acquiring file system path for partition
+  * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
+  */
+   public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint, HiveConf conf)
+          throws ClassNotFoundException, ConnectionError, SerializationError,
+                 InvalidColumn, StreamingException {
+     this(colNamesForFields, delimiter, endPoint, conf,
+             (char) LazySimpleSerDe.DefaultSeparators[0]);
+   }
+
+  /**
+   * Constructor. Allows overriding separator of the LazySimpleSerde
+   * @param colNamesForFields Column name assignment for input fields
+   * @param delimiter input field delimiter
+   * @param endPoint Hive endpoint
+   * @param conf a Hive conf object. Set to null if not using advanced hive settings.
+   * @param serdeSeparator separator used when encoding data that is fed into the
+   *                             LazySimpleSerde. Ensure this separator does not occur
+   *                             in the field data
+   * @throws ConnectionError Problem talking to Hive
+   * @throws ClassNotFoundException Serde class not found
+   * @throws SerializationError Serde initialization/interaction failed
+   * @throws StreamingException Problem acquiring file system path for partition
+   * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
+   */
+  public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint, HiveConf conf, char serdeSeparator)
+          throws ClassNotFoundException, ConnectionError, SerializationError,
+                 InvalidColumn, StreamingException {
+    super(endPoint, conf);
+    this.tableColumns = getCols(tbl);
+    this.serdeSeparator = serdeSeparator;
+    this.delimiter = delimiter;
+    this.fieldToColMapping = getFieldReordering(colNamesForFields, getTableColumns());
+    this.reorderingNeeded = isReorderingNeeded(delimiter, getTableColumns());
+    LOG.debug("Field reordering needed = " + this.reorderingNeeded + ", for endpoint " + endPoint);
+    this.serdeSeparator = serdeSeparator;
+  }
+
+  private boolean isReorderingNeeded(String delimiter, ArrayList<String> tableColumns) {
+    return !( delimiter.equals(String.valueOf(getSerdeSeparator()))
+            && areFieldsInColOrder(fieldToColMapping)
+            && tableColumns.size()>=fieldToColMapping.length );
+  }
+
+  private static boolean areFieldsInColOrder(int[] fieldToColMapping) {
+    for(int i=0; i<fieldToColMapping.length; ++i) {
+      if(fieldToColMapping[i]!=i) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @VisibleForTesting
+  static int[] getFieldReordering(String[] colNamesForFields, List<String> tableColNames)
+          throws InvalidColumn {
+    int[] result = new int[ colNamesForFields.length ];
+    for(int i=0; i<colNamesForFields.length; ++i) {
+      result[i] = -1;
+    }
+    int i=-1, fieldLabelCount=0;
+    for( String col : colNamesForFields ) {
+      ++i;
+      if(col == null) {
+        continue;
+      }
+      if( col.trim().isEmpty() ) {
+        continue;
+      }
+      ++fieldLabelCount;
+      int loc = tableColNames.indexOf(col);
+      if(loc == -1) {
+        throw new InvalidColumn("Column '" + col + "' not found in table for input field " + i+1);
+      }
+      result[i] = loc;
+    }
+    if(fieldLabelCount>tableColNames.size()) {
+      throw new InvalidColumn("Number of field names exceeds the number of columns in table");
+    }
+    return result;
+  }
+
+  // Reorder fields in record based on the order of columns in the table
+  protected byte[] reorderFields(byte[] record) throws UnsupportedEncodingException {
+    if(!reorderingNeeded) {
+      return record;
+    }
+    String[] reorderedFields = new String[getTableColumns().size()];
+    String decoded = new String(record);
+    String[] fields = decoded.split(delimiter);
+    for (int i=0; i<fieldToColMapping.length; ++i) {
+      int newIndex = fieldToColMapping[i];
+      if(newIndex != -1) {
+        reorderedFields[newIndex] = fields[i];
+      }
+    }
+    return join(reorderedFields,getSerdeSeparator());
+  }
+
+  // handles nulls in items[]
+  // TODO: perhaps can be made more efficient by creating a byte[] directly
+  private static byte[] join(String[] items, char separator) {
+    StringBuffer buff = new StringBuffer(100);
+    if(items.length == 0)
+      return "".getBytes();
+    int i=0;
+    for(; i<items.length-1; ++i) {
+      if(items[i]!=null) {
+        buff.append(items[i]);
+      }
+      buff.append(separator);
+    }
+    if(items[i]!=null) {
+      buff.append(items[i]);
+    }
+    return buff.toString().getBytes();
+  }
+
+  protected ArrayList<String> getTableColumns() {
+    return tableColumns;
+  }
+
+  @Override
+  public void write(long transactionId, byte[] record)
+          throws SerializationError, StreamingIOFailure {
+    try {
+      byte[] orderedFields = reorderFields(record);
+      Object encodedRow = encode(orderedFields);
+      updater.insert(transactionId, encodedRow);
+    } catch (IOException e) {
+      throw new StreamingIOFailure("Error writing record in transaction ("
+              + transactionId + ")", e);
+    }
+  }
+
+  @Override
+  SerDe getSerde() throws SerializationError {
+    if(serde!=null) {
+      return serde;
+    }
+    serde = createSerde(tbl, conf);
+    return serde;
+  }
+
+  private Object encode(byte[] record) throws SerializationError {
+    try {
+      BytesWritable blob = new BytesWritable();
+      blob.set(record, 0, record.length);
+      return serde.deserialize(blob);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to convert byte[] record into Object", e);
+    }
+  }
+
+  /**
+   * Creates LazySimpleSerde
+   * @return
+   * @throws SerializationError if serde could not be initialized
+   * @param tbl
+   */
+  protected LazySimpleSerDe createSerde(Table tbl, HiveConf conf)
+          throws SerializationError {
+    try {
+      Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+      tableProps.setProperty("field.delim", String.valueOf(serdeSeparator));
+      LazySimpleSerDe serde = new LazySimpleSerDe();
+      serde.initialize(conf, tableProps);
+      return serde;
+    } catch (SerDeException e) {
+      throw new SerializationError("Error initializing serde", e);
+    }
+  }
+
+  private ArrayList<String> getCols(Table table) {
+    List<FieldSchema> cols = table.getSd().getCols();
+    ArrayList<String> colNames = new ArrayList<String>(cols.size());
+    for (FieldSchema col : cols) {
+      colNames.add(col.getName().toLowerCase());
+    }
+    return  colNames;
+  }
+
+  public char getSerdeSeparator() {
+    return serdeSeparator;
+  }
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HeartBeatFailure.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HeartBeatFailure.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HeartBeatFailure.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HeartBeatFailure.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+import java.util.Collection;
+import java.util.Set;
+
+public class HeartBeatFailure extends StreamingException {
+  private Collection<Long> abortedTxns;
+  private Collection<Long> nosuchTxns;
+
+  public HeartBeatFailure(Collection<Long> abortedTxns, Set<Long> nosuchTxns) {
+    super("Heart beat error. InvalidTxns: " + nosuchTxns + ". AbortedTxns: " + abortedTxns);
+    this.abortedTxns = abortedTxns;
+    this.nosuchTxns = nosuchTxns;
+  }
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,823 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Information about the hive end point (i.e. table or partition) to write to.
+ * A light weight object that does NOT internally hold on to resources such as
+ * network connections. It can be stored in Hashed containers such as sets and hash tables.
+ */
+public class HiveEndPoint {
+  public final String metaStoreUri;
+  public final String database;
+  public final String table;
+  public final ArrayList<String> partitionVals;
+
+
+  static final private Log LOG = LogFactory.getLog(HiveEndPoint.class.getName());
+
+  /**
+   *
+   * @param metaStoreUri   URI of the metastore to connect to eg: thrift://localhost:9083
+   * @param database       Name of the Hive database
+   * @param table          Name of table to stream to
+   * @param partitionVals  Indicates the specific partition to stream to. Can be null or empty List
+   *                       if streaming to a table without partitions. The order of values in this
+   *                       list must correspond exactly to the order of partition columns specified
+   *                       during the table creation. E.g. For a table partitioned by
+   *                       (continent string, country string), partitionVals could be the list
+   *                       ("Asia", "India").
+   */
+  public HiveEndPoint(String metaStoreUri
+          , String database, String table, List<String> partitionVals) {
+    this.metaStoreUri = metaStoreUri;
+    if (database==null) {
+      throw new IllegalArgumentException("Database cannot be null for HiveEndPoint");
+    }
+    this.database = database;
+    this.table = table;
+    if (table==null) {
+      throw new IllegalArgumentException("Table cannot be null for HiveEndPoint");
+    }
+    this.partitionVals = partitionVals==null ? new ArrayList<String>()
+                                             : new ArrayList<String>( partitionVals );
+  }
+
+
+  /**
+   * Acquire a new connection to MetaStore for streaming
+   * @param createPartIfNotExists If true, the partition specified in the endpoint
+   *                              will be auto created if it does not exist
+   * @return
+   * @throws ConnectionError if problem connecting
+   * @throws InvalidPartition  if specified partition is not valid (createPartIfNotExists = false)
+   * @throws ImpersonationFailed  if not able to impersonate 'proxyUser'
+   * @throws IOException  if there was an I/O error when acquiring connection
+   * @throws PartitionCreationFailed if failed to create partition
+   * @throws InterruptedException
+   */
+  public StreamingConnection newConnection(final boolean createPartIfNotExists)
+          throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
+          , ImpersonationFailed , InterruptedException {
+    return newConnection(null, createPartIfNotExists, null);
+  }
+
+  /**
+   * Acquire a new connection to MetaStore for streaming
+   * @param createPartIfNotExists If true, the partition specified in the endpoint
+   *                              will be auto created if it does not exist
+   * @param conf HiveConf object, set it to null if not using advanced hive settings.
+   * @return
+   * @throws ConnectionError if problem connecting
+   * @throws InvalidPartition  if specified partition is not valid (createPartIfNotExists = false)
+   * @throws ImpersonationFailed  if not able to impersonate 'proxyUser'
+   * @throws IOException  if there was an I/O error when acquiring connection
+   * @throws PartitionCreationFailed if failed to create partition
+   * @throws InterruptedException
+   */
+  public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf)
+          throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
+          , ImpersonationFailed , InterruptedException {
+    return newConnection(null, createPartIfNotExists, conf);
+  }
+
+  /**
+   * Acquire a new connection to MetaStore for streaming
+   * @param proxyUser User on whose behalf all hdfs and hive operations will be
+   *                  performed on this connection. Set it to null or empty string
+   *                  to connect as user of current process without impersonation.
+   *                  Currently this argument is not supported and must be null
+   * @param createPartIfNotExists If true, the partition specified in the endpoint
+   *                              will be auto created if it does not exist
+   * @return
+   * @throws ConnectionError if problem connecting
+   * @throws InvalidPartition  if specified partition is not valid (createPartIfNotExists = false)
+   * @throws ImpersonationFailed  if not able to impersonate 'proxyUser'
+   * @throws IOException  if there was an I/O error when acquiring connection
+   * @throws PartitionCreationFailed if failed to create partition
+   * @throws InterruptedException
+   */
+  private StreamingConnection newConnection(final String proxyUser,
+                                            final boolean createPartIfNotExists, final HiveConf conf)
+          throws ConnectionError, InvalidPartition,
+               InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException {
+    if (proxyUser ==null || proxyUser.trim().isEmpty() ) {
+      return newConnectionImpl(System.getProperty("user.name"), null, createPartIfNotExists, conf);
+    }
+    final UserGroupInformation ugi = getUserGroupInfo(proxyUser);
+    try {
+      return ugi.doAs (
+              new PrivilegedExceptionAction<StreamingConnection>() {
+                @Override
+                public StreamingConnection run()
+                        throws ConnectionError, InvalidPartition, InvalidTable
+                        , PartitionCreationFailed {
+                  return newConnectionImpl(proxyUser, ugi, createPartIfNotExists, conf);
+                }
+              }
+      );
+    } catch (IOException e) {
+      throw new ImpersonationFailed("Failed to impersonate '" + proxyUser +
+              "' when acquiring connection", e);
+    }
+  }
+
+
+
+  private StreamingConnection newConnectionImpl(String proxyUser, UserGroupInformation ugi,
+                                               boolean createPartIfNotExists, HiveConf conf)
+          throws ConnectionError, InvalidPartition, InvalidTable
+          , PartitionCreationFailed {
+    return new ConnectionImpl(this, proxyUser, ugi, conf, createPartIfNotExists);
+  }
+
+  private static UserGroupInformation getUserGroupInfo(String proxyUser)
+          throws ImpersonationFailed {
+    try {
+      return UserGroupInformation.createProxyUser(
+              proxyUser, UserGroupInformation.getLoginUser());
+    } catch (IOException e) {
+      LOG.error("Unable to login as proxy user. Exception follows.", e);
+      throw new ImpersonationFailed(proxyUser,e);
+    }
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    HiveEndPoint endPoint = (HiveEndPoint) o;
+
+    if (database != null
+            ? !database.equals(endPoint.database)
+            : endPoint.database != null ) {
+      return false;
+    }
+    if (metaStoreUri != null
+            ? !metaStoreUri.equals(endPoint.metaStoreUri)
+            : endPoint.metaStoreUri != null ) {
+      return false;
+    }
+    if (!partitionVals.equals(endPoint.partitionVals)) {
+      return false;
+    }
+    if (table != null ? !table.equals(endPoint.table) : endPoint.table != null) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = metaStoreUri != null ? metaStoreUri.hashCode() : 0;
+    result = 31 * result + (database != null ? database.hashCode() : 0);
+    result = 31 * result + (table != null ? table.hashCode() : 0);
+    result = 31 * result + partitionVals.hashCode();
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "{" +
+            "metaStoreUri='" + metaStoreUri + '\'' +
+            ", database='" + database + '\'' +
+            ", table='" + table + '\'' +
+            ", partitionVals=" + partitionVals + " }";
+  }
+
+
+  private static class ConnectionImpl implements StreamingConnection {
+    private final IMetaStoreClient msClient;
+    private final HiveEndPoint endPt;
+    private final String proxyUser;
+    private final UserGroupInformation ugi;
+
+    /**
+     *
+     * @param endPoint end point to connect to
+     * @param proxyUser  can be null
+     * @param ugi of prody user. If ugi is null, impersonation of proxy user will be disabled
+     * @param conf HiveConf object
+     * @param createPart create the partition if it does not exist
+     * @throws ConnectionError if there is trouble connecting
+     * @throws InvalidPartition if specified partition does not exist (and createPart=false)
+     * @throws InvalidTable if specified table does not exist
+     * @throws PartitionCreationFailed if createPart=true and not able to create partition
+     */
+    private ConnectionImpl(HiveEndPoint endPoint, String proxyUser, UserGroupInformation ugi,
+                           HiveConf conf, boolean createPart)
+            throws ConnectionError, InvalidPartition, InvalidTable
+                   , PartitionCreationFailed {
+      this.proxyUser = proxyUser;
+      this.endPt = endPoint;
+      this.ugi = ugi;
+      if (conf==null) {
+        conf = HiveEndPoint.createHiveConf(this.getClass(),endPoint.metaStoreUri);
+      }
+      this.msClient = getMetaStoreClient(endPoint, conf);
+      if (createPart  &&  !endPoint.partitionVals.isEmpty()) {
+        createPartitionIfNotExists(endPoint, msClient, conf);
+      }
+    }
+
+    /**
+     * Close connection
+     */
+    @Override
+    public void close() {
+      if (ugi==null) {
+        msClient.close();
+        return;
+      }
+      try {
+        ugi.doAs (
+            new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws Exception {
+                msClient.close();
+                return null;
+              }
+            } );
+      } catch (IOException e) {
+        LOG.error("Error closing connection to " + endPt, e);
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted when closing connection to " + endPt, e);
+      }
+    }
+
+
+    /**
+     * Acquires a new batch of transactions from Hive.
+     *
+     * @param numTransactions is a hint from client indicating how many transactions client needs.
+     * @param recordWriter  Used to write record. The same writer instance can
+     *                      be shared with another TransactionBatch (to the same endpoint)
+     *                      only after the first TransactionBatch has been closed.
+     *                      Writer will be closed when the TransactionBatch is closed.
+     * @return
+     * @throws StreamingIOFailure if failed to create new RecordUpdater for batch
+     * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
+     * @throws ImpersonationFailed failed to run command as proxyUser
+     * @throws InterruptedException
+     */
+    public TransactionBatch fetchTransactionBatch(final int numTransactions,
+                                                      final RecordWriter recordWriter)
+            throws StreamingException, TransactionBatchUnAvailable, ImpersonationFailed
+                  , InterruptedException {
+      if (ugi==null) {
+        return fetchTransactionBatchImpl(numTransactions, recordWriter);
+      }
+      try {
+        return ugi.doAs (
+                new PrivilegedExceptionAction<TransactionBatch>() {
+                  @Override
+                  public TransactionBatch run() throws StreamingException {
+                    return fetchTransactionBatchImpl(numTransactions, recordWriter);
+                  }
+                }
+        );
+      } catch (IOException e) {
+        throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
+                "' when acquiring Transaction Batch on endPoint " + endPt, e);
+      }
+    }
+
+    private TransactionBatch fetchTransactionBatchImpl(int numTransactions,
+                                                  RecordWriter recordWriter)
+            throws StreamingException, TransactionBatchUnAvailable {
+      return new TransactionBatchImpl(proxyUser, ugi, endPt, numTransactions, msClient
+              , recordWriter);
+    }
+
+
+    private static void createPartitionIfNotExists(HiveEndPoint ep,
+                                                   IMetaStoreClient msClient, HiveConf conf)
+            throws InvalidTable, PartitionCreationFailed {
+      if (ep.partitionVals.isEmpty()) {
+        return;
+      }
+      SessionState state = SessionState.start(new CliSessionState(conf));
+      Driver driver = new Driver(conf);
+
+      try {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Attempting to create partition (if not existent) " + ep);
+        }
+
+        List<FieldSchema> partKeys = msClient.getTable(ep.database, ep.table)
+                .getPartitionKeys();
+        runDDL(driver, "use " + ep.database);
+        String query = "alter table " + ep.table + " add if not exists partition "
+                + partSpecStr(partKeys, ep.partitionVals);
+        runDDL(driver, query);
+      } catch (MetaException e) {
+        LOG.error("Failed to create partition : " + ep, e);
+        throw new PartitionCreationFailed(ep, e);
+      } catch (NoSuchObjectException e) {
+        LOG.error("Failed to create partition : " + ep, e);
+        throw new InvalidTable(ep.database, ep.table);
+      } catch (TException e) {
+        LOG.error("Failed to create partition : " + ep, e);
+        throw new PartitionCreationFailed(ep, e);
+      } catch (QueryFailedException e) {
+        LOG.error("Failed to create partition : " + ep, e);
+        throw new PartitionCreationFailed(ep, e);
+      } finally {
+        driver.close();
+        try {
+          state.close();
+        } catch (IOException e) {
+          LOG.warn("Error closing SessionState used to run Hive DDL.");
+        }
+      }
+    }
+
+    private static boolean runDDL(Driver driver, String sql) throws QueryFailedException {
+      int retryCount = 1; // # of times to retry if first attempt fails
+      for (int attempt=0; attempt<=retryCount; ++attempt) {
+        try {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Running Hive Query: "+ sql);
+          }
+          driver.run(sql);
+          return true;
+        } catch (CommandNeedRetryException e) {
+          if (attempt==retryCount) {
+            throw new QueryFailedException(sql, e);
+          }
+          continue;
+        }
+      } // for
+      return false;
+    }
+
+    private static String partSpecStr(List<FieldSchema> partKeys, ArrayList<String> partVals) {
+      if (partKeys.size()!=partVals.size()) {
+        throw new IllegalArgumentException("Partition values:" + partVals +
+                ", does not match the partition Keys in table :" + partKeys );
+      }
+      StringBuffer buff = new StringBuffer(partKeys.size()*20);
+      buff.append(" ( ");
+      int i=0;
+      for (FieldSchema schema : partKeys) {
+        buff.append(schema.getName());
+        buff.append("='");
+        buff.append(partVals.get(i));
+        buff.append("'");
+        if (i!=partKeys.size()-1) {
+          buff.append(",");
+        }
+        ++i;
+      }
+      buff.append(" )");
+      return buff.toString();
+    }
+
+    private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveConf conf)
+            throws ConnectionError {
+
+      if (endPoint.metaStoreUri!= null) {
+        conf.setVar(HiveConf.ConfVars.METASTOREURIS, endPoint.metaStoreUri);
+      }
+
+      try {
+        return new HiveMetaStoreClient(conf);
+      } catch (MetaException e) {
+        throw new ConnectionError("Error connecting to Hive Metastore URI: "
+                + endPoint.metaStoreUri, e);
+      }
+    }
+
+
+  } // class ConnectionImpl
+
+  private static class TransactionBatchImpl implements TransactionBatch {
+    private final String proxyUser;
+    private final UserGroupInformation ugi;
+    private final HiveEndPoint endPt;
+    private final IMetaStoreClient msClient;
+    private final RecordWriter recordWriter;
+    private final List<Long> txnIds;
+
+    private int currentTxnIndex;
+    private final String partNameForLock;
+
+    private TxnState state;
+    private LockRequest lockRequest = null;
+
+    /**
+     * Represents a batch of transactions acquired from MetaStore
+     *
+     * @param proxyUser
+     * @param ugi
+     * @param endPt
+     * @param numTxns
+     * @param msClient
+     * @param recordWriter
+     * @throws StreamingException if failed to create new RecordUpdater for batch
+     * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
+     */
+    private TransactionBatchImpl(String proxyUser, UserGroupInformation ugi, HiveEndPoint endPt
+              , int numTxns, IMetaStoreClient msClient, RecordWriter recordWriter)
+            throws StreamingException, TransactionBatchUnAvailable {
+      try {
+        if ( endPt.partitionVals!=null   &&   !endPt.partitionVals.isEmpty() ) {
+          Table tableObj = msClient.getTable(endPt.database, endPt.table);
+          List<FieldSchema> partKeys = tableObj.getPartitionKeys();
+          partNameForLock = Warehouse.makePartName(partKeys, endPt.partitionVals);
+        } else {
+          partNameForLock = null;
+        }
+        this.proxyUser = proxyUser;
+        this.ugi = ugi;
+        this.endPt = endPt;
+        this.msClient = msClient;
+        this.recordWriter = recordWriter;
+        this.txnIds = msClient.openTxns(proxyUser, numTxns).getTxn_ids();
+        this.currentTxnIndex = -1;
+        this.state = TxnState.INACTIVE;
+        recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1));
+      } catch (TException e) {
+        throw new TransactionBatchUnAvailable(endPt, e);
+      }
+    }
+
+    @Override
+    public String toString() {
+      if (txnIds==null || txnIds.isEmpty()) {
+        return "{}";
+      }
+      return "TxnIds=[" + txnIds.get(0) + "src/gen/thrift" + txnIds.get(txnIds.size()-1)
+              + "] on endPoint= " + endPt;
+    }
+
+    /**
+     * Activate the next available transaction in the current transaction batch
+     * @throws TransactionError failed to switch to next transaction
+     */
+    @Override
+    public void beginNextTransaction() throws TransactionError, ImpersonationFailed,
+            InterruptedException {
+      if (ugi==null) {
+        beginNextTransactionImpl();
+        return;
+      }
+      try {
+        ugi.doAs (
+              new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws TransactionError {
+                  beginNextTransactionImpl();
+                  return null;
+                }
+              }
+        );
+      } catch (IOException e) {
+        throw new ImpersonationFailed("Failed impersonating proxyUser '" + proxyUser +
+                "' when switch to next Transaction for endPoint :" + endPt, e);
+      }
+    }
+
+    private void beginNextTransactionImpl() throws TransactionError {
+      if ( currentTxnIndex >= txnIds.size() )
+        throw new InvalidTrasactionState("No more transactions available in" +
+                " current batch for end point : " + endPt);
+      ++currentTxnIndex;
+      lockRequest = createLockRequest(endPt, partNameForLock, proxyUser, getCurrentTxnId());
+      try {
+        LockResponse res = msClient.lock(lockRequest);
+        if (res.getState() != LockState.ACQUIRED) {
+          throw new TransactionError("Unable to acquire lock on " + endPt);
+        }
+      } catch (TException e) {
+        throw new TransactionError("Unable to acquire lock on " + endPt, e);
+      }
+
+      state = TxnState.OPEN;
+    }
+
+    /**
+     * Get Id of currently open transaction
+     * @return
+     */
+    @Override
+    public Long getCurrentTxnId() {
+      return txnIds.get(currentTxnIndex);
+    }
+
+    /**
+     * get state of current tramsaction
+     * @return
+     */
+    @Override
+    public TxnState getCurrentTransactionState() {
+      return state;
+    }
+
+    /**
+     * Remaining transactions are the ones that are not committed or aborted or active.
+     * Active transaction is not considered part of remaining txns.
+     * @return number of transactions remaining this batch.
+     */
+    @Override
+    public int remainingTransactions() {
+      if (currentTxnIndex>=0) {
+        return txnIds.size() - currentTxnIndex -1;
+      }
+      return txnIds.size();
+    }
+
+
+    /**
+     *  Write record using RecordWriter
+     * @param record  the data to be written
+     * @throws StreamingIOFailure I/O failure
+     * @throws SerializationError  serialization error
+     * @throws ImpersonationFailed error writing on behalf of proxyUser
+     * @throws InterruptedException
+     */
+    @Override
+    public void write(final byte[] record)
+            throws StreamingException, InterruptedException,
+            ImpersonationFailed {
+      if (ugi==null) {
+        recordWriter.write(getCurrentTxnId(), record);
+        return;
+      }
+      try {
+        ugi.doAs (
+            new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws StreamingException {
+                recordWriter.write(getCurrentTxnId(), record);
+                return null;
+              }
+            }
+        );
+      } catch (IOException e) {
+        throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
+                "' when writing to endPoint :" + endPt + ". Transaction Id: "
+                + getCurrentTxnId(), e);
+      }
+    }
+
+
+    /**
+     *  Write records using RecordWriter
+     * @param records collection of rows to be written
+     * @throws StreamingException  serialization error
+     * @throws ImpersonationFailed error writing on behalf of proxyUser
+     * @throws InterruptedException
+     */
+    @Override
+    public void write(final Collection<byte[]> records)
+            throws StreamingException, InterruptedException,
+            ImpersonationFailed {
+      if (ugi==null) {
+        writeImpl(records);
+        return;
+      }
+      try {
+        ugi.doAs (
+                new PrivilegedExceptionAction<Void>() {
+                  @Override
+                  public Void run() throws StreamingException {
+                    writeImpl(records);
+                    return null;
+                  }
+                }
+        );
+      } catch (IOException e) {
+        throw new ImpersonationFailed("Failed impersonating proxyUser '" + proxyUser +
+                "' when writing to endPoint :" + endPt + ". Transaction Id: "
+                + getCurrentTxnId(), e);
+      }
+    }
+
+    private void writeImpl(Collection<byte[]> records)
+            throws StreamingException {
+      for (byte[] record : records) {
+        recordWriter.write(getCurrentTxnId(), record);
+      }
+    }
+
+
+    /**
+     * Commit the currently open transaction
+     * @throws TransactionError
+     * @throws StreamingIOFailure  if flushing records failed
+     * @throws ImpersonationFailed if
+     * @throws InterruptedException
+     */
+    @Override
+    public void commit()  throws TransactionError, StreamingException,
+           ImpersonationFailed, InterruptedException {
+      if (ugi==null) {
+        commitImpl();
+        return;
+      }
+      try {
+        ugi.doAs (
+              new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws StreamingException {
+                  commitImpl();
+                  return null;
+                }
+              }
+        );
+      } catch (IOException e) {
+        throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
+                "' when committing Txn on endPoint :" + endPt + ". Transaction Id: "
+                + getCurrentTxnId(), e);
+      }
+
+    }
+
+    private void commitImpl() throws TransactionError, StreamingException {
+      try {
+        recordWriter.flush();
+        msClient.commitTxn(txnIds.get(currentTxnIndex));
+        state = TxnState.COMMITTED;
+      } catch (NoSuchTxnException e) {
+        throw new TransactionError("Invalid transaction id : "
+                + getCurrentTxnId(), e);
+      } catch (TxnAbortedException e) {
+        throw new TransactionError("Aborted transaction cannot be committed"
+                , e);
+      } catch (TException e) {
+        throw new TransactionError("Unable to commit transaction"
+                + getCurrentTxnId(), e);
+      }
+    }
+
+    /**
+     * Abort the currently open transaction
+     * @throws TransactionError
+     */
+    @Override
+    public void abort() throws TransactionError, StreamingException
+                      , ImpersonationFailed, InterruptedException {
+      if (ugi==null) {
+        abortImpl();
+        return;
+      }
+      try {
+        ugi.doAs (
+                new PrivilegedExceptionAction<Void>() {
+                  @Override
+                  public Void run() throws StreamingException {
+                    abortImpl();
+                    return null;
+                  }
+                }
+        );
+      } catch (IOException e) {
+        throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
+                "' when aborting Txn on endPoint :" + endPt + ". Transaction Id: "
+                + getCurrentTxnId(), e);
+      }
+    }
+
+    private void abortImpl() throws TransactionError, StreamingException {
+      try {
+        recordWriter.clear();
+        msClient.rollbackTxn(getCurrentTxnId());
+        state = TxnState.ABORTED;
+      } catch (NoSuchTxnException e) {
+        throw new TransactionError("Unable to abort invalid transaction id : "
+                + getCurrentTxnId(), e);
+      } catch (TException e) {
+        throw new TransactionError("Unable to abort transaction id : "
+                + getCurrentTxnId(), e);
+      }
+    }
+
+    @Override
+    public void heartbeat() throws StreamingException, HeartBeatFailure {
+      Long first = txnIds.get(currentTxnIndex);
+      Long last = txnIds.get(txnIds.size()-1);
+      try {
+        HeartbeatTxnRangeResponse resp = msClient.heartbeatTxnRange(first, last);
+        if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {
+          throw new HeartBeatFailure(resp.getAborted(), resp.getNosuch());
+        }
+      } catch (TException e) {
+        throw new StreamingException("Failure to heartbeat on ids (" + first + "src/gen/thrift"
+                + last + ") on end point : " + endPt );
+      }
+    }
+
+    /**
+     * Close the TransactionBatch
+     * @throws StreamingIOFailure I/O failure when closing transaction batch
+     */
+    @Override
+    public void close() throws StreamingException, ImpersonationFailed, InterruptedException {
+      if (ugi==null) {
+        state = TxnState.INACTIVE;
+        recordWriter.closeBatch();
+        return;
+      }
+      try {
+        ugi.doAs (
+                new PrivilegedExceptionAction<Void>() {
+                  @Override
+                  public Void run() throws StreamingException {
+                    state = TxnState.INACTIVE;
+                    recordWriter.closeBatch();
+                    return null;
+                  }
+                }
+        );
+      } catch (IOException e) {
+        throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
+                "' when closing Txn Batch on  endPoint :" + endPt, e);
+      }
+    }
+
+    private static LockRequest createLockRequest(final HiveEndPoint hiveEndPoint,
+            String partNameForLock, String user, long txnId)  {
+      LockRequestBuilder rqstBuilder = new LockRequestBuilder();
+      rqstBuilder.setUser(user);
+      rqstBuilder.setTransactionId(txnId);
+
+      LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
+              .setDbName(hiveEndPoint.database)
+              .setTableName(hiveEndPoint.table)
+              .setShared();
+      if (partNameForLock!=null && !partNameForLock.isEmpty() ) {
+          lockCompBuilder.setPartitionName(partNameForLock);
+      }
+      rqstBuilder.addLockComponent(lockCompBuilder.build());
+
+      return rqstBuilder.build();
+    }
+  } // class TransactionBatchImpl
+
+  static HiveConf createHiveConf(Class<?> clazz, String metaStoreUri) {
+    HiveConf conf = new HiveConf(clazz);
+    conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER,
+            "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+    conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
+    if (metaStoreUri!= null) {
+      conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
+    }
+    return conf;
+  }
+
+}  // class HiveEndPoint

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ImpersonationFailed.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ImpersonationFailed.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ImpersonationFailed.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ImpersonationFailed.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+public class ImpersonationFailed extends StreamingException {
+  public ImpersonationFailed(String username, Exception e) {
+    super("Failed to impersonate user " + username, e);
+  }
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidColumn.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidColumn.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidColumn.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidColumn.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+public class InvalidColumn extends StreamingException {
+
+  public InvalidColumn(String msg) {
+    super(msg);
+  }
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidPartition.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidPartition.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidPartition.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidPartition.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+public class InvalidPartition extends StreamingException {
+
+  public InvalidPartition(String partitionName, String partitionValue) {
+    super("Invalid partition: Name=" + partitionName +
+            ", Value=" + partitionValue);
+  }
+
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+public class InvalidTable extends StreamingException {
+
+  private static String makeMsg(String db, String table) {
+    return "Invalid table db:" + db + ", table:" + table;
+  }
+
+  public InvalidTable(String db, String table) {
+    super(makeMsg(db,table), null);
+  }
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTrasactionState.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTrasactionState.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTrasactionState.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTrasactionState.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+public class InvalidTrasactionState extends TransactionError {
+  public InvalidTrasactionState(String msg) {
+    super(msg);
+  }
+
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/PartitionCreationFailed.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/PartitionCreationFailed.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/PartitionCreationFailed.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/PartitionCreationFailed.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+public class PartitionCreationFailed extends StreamingException {
+  public PartitionCreationFailed(HiveEndPoint endPoint, Exception cause) {
+    super("Failed to create partition " + endPoint, cause);
+  }
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/QueryFailedException.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/QueryFailedException.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/QueryFailedException.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/QueryFailedException.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+
+public class QueryFailedException extends StreamingException {
+  String query;
+  public QueryFailedException(String query, CommandNeedRetryException e) {
+    super("Query failed: " + query + ". Due to :" + e.getMessage(), e);
+    this.query = query;
+  }
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+
+public interface RecordWriter {
+
+  /** Writes using a hive RecordUpdater
+   *
+   * @param transactionId the ID of the Txn in which the write occurs
+   * @param record the record to be written
+   */
+  public void write(long transactionId, byte[] record) throws StreamingException;
+
+  /** Flush records from buffer. Invoked by TransactionBatch.commit() */
+  public void flush() throws StreamingException;
+
+  /** Clear bufferred writes. Invoked by TransactionBatch.abort() */
+  public void clear() throws StreamingException;
+
+  /** Acquire a new RecordUpdater. Invoked when
+   * StreamingConnection.fetchTransactionBatch() is called */
+  public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingException;
+
+  /** Close the RecordUpdater. Invoked by TransactionBatch.close() */
+  public void closeBatch() throws StreamingException;
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/SerializationError.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/SerializationError.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/SerializationError.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/SerializationError.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+
+public class SerializationError extends StreamingException {
+  public SerializationError(String msg, Exception e) {
+    super(msg,e);
+  }
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+/**
+ * Represents a connection to a HiveEndPoint. Used to acquire transaction batches.
+ */
+public interface StreamingConnection {
+
+  /**
+   * Acquires a new batch of transactions from Hive.
+
+   * @param numTransactionsHint is a hint from client indicating how many transactions client needs.
+   * @param writer  Used to write record. The same writer instance can
+   *                      be shared with another TransactionBatch (to the same endpoint)
+   *                      only after the first TransactionBatch has been closed.
+   *                      Writer will be closed when the TransactionBatch is closed.
+   * @return
+   * @throws ConnectionError
+   * @throws InvalidPartition
+   * @throws StreamingException
+   * @return a batch of transactions
+   */
+  public TransactionBatch fetchTransactionBatch(int numTransactionsHint,
+                                                RecordWriter writer)
+          throws ConnectionError, StreamingException, InterruptedException;
+
+  /**
+   * Close connection
+   */
+  public void close();
+
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingException.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingException.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingException.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingException.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+public class StreamingException extends Exception {
+  public StreamingException(String msg, Exception cause) {
+    super(msg, cause);
+  }
+  public StreamingException(String msg) {
+    super(msg);
+  }
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingIOFailure.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingIOFailure.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingIOFailure.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingIOFailure.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+
+public class StreamingIOFailure extends StreamingException {
+
+  public StreamingIOFailure(String msg, Exception cause) {
+    super(msg, cause);
+  }
+
+  public StreamingIOFailure(String msg) {
+    super(msg);
+  }
+}