You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2014/04/10 03:09:01 UTC

svn commit: r1586190 [2/2] - in /hive/branches/branch-0.13: hcatalog/ hcatalog/streaming/ hcatalog/streaming/src/ hcatalog/streaming/src/java/ hcatalog/streaming/src/java/org/ hcatalog/streaming/src/java/org/apache/ hcatalog/streaming/src/java/org/apac...

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.hcatalog.data.JsonSerDe;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Streaming Writer handles utf8 encoded Json (Strict syntax).
+ * Uses org.apache.hive.hcatalog.data.JsonSerDe to process Json input
+ */
+public class StrictJsonWriter extends AbstractRecordWriter {
+  private JsonSerDe serde;
+
+  /**
+   *
+   * @param endPoint the end point to write to
+   * @throws ConnectionError
+   * @throws SerializationError
+   * @throws StreamingException
+   */
+  public StrictJsonWriter(HiveEndPoint endPoint)
+          throws ConnectionError, SerializationError, StreamingException {
+    super(endPoint, null);
+  }
+
+  /**
+   *
+   * @param endPoint the end point to write to
+   * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
+   * @throws ConnectionError
+   * @throws SerializationError
+   * @throws StreamingException
+   */
+  public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf)
+          throws ConnectionError, SerializationError, StreamingException {
+    super(endPoint, conf);
+  }
+
+  @Override
+  SerDe getSerde() throws SerializationError {
+    if(serde!=null) {
+      return serde;
+    }
+    serde = createSerde(tbl, conf);
+    return serde;
+  }
+
+  @Override
+  public void write(long transactionId, byte[] record)
+          throws StreamingIOFailure, SerializationError {
+    try {
+      Object encodedRow = encode(record);
+      updater.insert(transactionId, encodedRow);
+    } catch (IOException e) {
+      throw new StreamingIOFailure("Error writing record in transaction("
+              + transactionId + ")", e);
+    }
+
+  }
+
+  /**
+   * Creates JsonSerDe
+   * @param tbl   used to create serde
+   * @param conf  used to create serde
+   * @return
+   * @throws SerializationError if serde could not be initialized
+   */
+  private static JsonSerDe createSerde(Table tbl, HiveConf conf)
+          throws SerializationError {
+    try {
+      Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+      JsonSerDe serde = new JsonSerDe();
+      serde.initialize(conf, tableProps);
+      return serde;
+    } catch (SerDeException e) {
+      throw new SerializationError("Error initializing serde " + JsonSerDe.class.getName(), e);
+    }
+  }
+
+  /**
+   * Encode Utf8 encoded string bytes using JsonSerde
+   * @param utf8StrRecord
+   * @return  The encoded object
+   * @throws SerializationError
+   */
+  private Object encode(byte[] utf8StrRecord) throws SerializationError {
+    try {
+      Text blob = new Text(utf8StrRecord);
+      return serde.deserialize(blob);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to convert byte[] record into Object", e);
+    }
+  }
+
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+
+import java.util.Collection;
+
+/**
+ * Represents a set of Transactions returned by Hive. Supports opening, writing to
+ * and commiting/aborting each transaction. The interface is designed to ensure
+ * transactions in a batch are used up sequentially. Multiple transaction batches can be
+ * used (initialized with separate RecordWriters) for concurrent streaming
+ *
+ */
+public interface TransactionBatch  {
+  public enum TxnState {INACTIVE, OPEN, COMMITTED, ABORTED }
+
+  /**
+   * Activate the next available transaction in the current transaction batch
+   * @throws StreamingException if not able to switch to next Txn
+   * @throws InterruptedException if call in interrupted
+   */
+  public void beginNextTransaction() throws StreamingException, InterruptedException;
+
+  /**
+   * Get Id of currently open transaction
+   * @return transaction id
+   */
+  public Long getCurrentTxnId();
+
+  /**
+   * get state of current transaction
+   */
+  public TxnState getCurrentTransactionState();
+
+  /**
+   * Commit the currently open transaction
+   * @throws StreamingException if there are errors committing
+   * @throws InterruptedException if call in interrupted
+   */
+  public void commit() throws StreamingException, InterruptedException;
+
+  /**
+   * Abort the currently open transaction
+   * @throws StreamingException if there are errors
+   * @throws InterruptedException if call in interrupted
+   */
+  public void abort() throws StreamingException, InterruptedException;
+
+  /**
+   * Remaining transactions are the ones that are not committed or aborted or open.
+   * Current open transaction is not considered part of remaining txns.
+   * @return number of transactions remaining this batch.
+   */
+  public int remainingTransactions();
+
+
+  /**
+   *  Write record using RecordWriter
+   * @param record  the data to be written
+   * @throws StreamingException if there are errors when writing
+   * @throws InterruptedException if call in interrupted
+   */
+  public void write(byte[] record) throws StreamingException, InterruptedException;
+
+  /**
+   *  Write records using RecordWriter
+   * @throws StreamingException if there are errors when writing
+   * @throws InterruptedException if call in interrupted
+   */
+  public void write(Collection<byte[]> records) throws StreamingException, InterruptedException;
+
+
+  /**
+   * Issues a heartbeat to hive metastore on the current and remaining txn ids
+   * to keep them from expiring
+   * @throws StreamingException if there are errors
+   */
+  public void heartbeat() throws StreamingException;
+
+  /**
+   * Close the TransactionBatch
+   * @throws StreamingException if there are errors closing batch
+   * @throws InterruptedException if call in interrupted
+   */
+  public void close() throws StreamingException, InterruptedException;
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatchUnAvailable.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatchUnAvailable.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatchUnAvailable.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatchUnAvailable.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+public class TransactionBatchUnAvailable extends StreamingException {
+  public TransactionBatchUnAvailable(HiveEndPoint ep, Exception e) {
+    super("Unable to acquire transaction batch on end point: " + ep, e);
+  }
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+public class TransactionError extends StreamingException {
+  public TransactionError(String msg, Exception e) {
+    super(msg, e);
+  }
+
+  public TransactionError(String msg) {
+    super(msg);
+  }
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html Thu Apr 10 01:08:59 2014
@@ -0,0 +1,166 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
+        "http://www.w3.org/TR/html4/loose.dtd">
+
+<html lang="en">
+
+<head>
+<meta name=Title content="HCatalog Streaming API">
+<meta name=Keywords content="HCatalog Streaming ACID">
+<meta http-equiv=Content-Type content="text/html; charset=macintosh">
+<title>HCatalog Streaming API</title>
+</head>
+
+<body>
+
+<h1>HCatalog Streaming API -- high level description</h1>
+
+<b>NOTE: The Streaming API feature is provided as a technology
+preview. The API may undergo incompatible changes in upcoming
+releases.</b>
+
+<p>
+Traditionally adding new data into hive requires gathering a large
+amount of data onto HDFS and then periodically adding a new
+partition. This is essentially a <i>batch insertion</i>. Insertion of
+new data into an existing partition or table is not done in a way that
+gives consistent results to readers. Hive Streaming API allows data to
+be pumped continuously into Hive. The incoming data can be
+continuously committed in small batches (of records) into a Hive
+partition. Once data is committed it becomes immediately visible to
+all Hive queries initiated subsequently.</p>
+
+<p>
+This API is intended for streaming clients such as Flume and Storm,
+which continuously generate data. Streaming support is built on top of
+ACID based insert/update support in Hive.</p>
+
+<p>
+The classes and interfaces part of the Hive streaming API are broadly
+categorized into two. The first set provides support for connection
+and transaction management while the second set provides I/O
+support. Transactions are managed by the Hive MetaStore.  Writes are
+performed to HDFS via Hive wrapper APIs that bypass MetaStore. </p>
+
+<p>
+<b>Note on packaging</b>: The APIs are defined in the 
+<b>org.apache.hive.hcatalog.streaming</b> Java package and included as 
+the hive-hcatalog-streaming jar.</p>
+
+<h2>STREAMING REQUIREMENTS</h2>
+
+<p>
+A few things are currently required to use streaming.
+</p>
+
+<p>
+<ol>
+  <li> Currently, only ORC storage format is supported. So 
+    '<b>stored as orc</b>' must be specified during table creation.</li>
+  <li> The hive table must be bucketed, but not sorted. So something like 
+    '<b>clustered by (<i>colName</i>) into <i>10</i> buckets</b>' must 
+    be specified during table creation. The number of buckets
+    is ideally the same as the number of streaming writers.</li>
+  <li> User of the client streaming process must have the necessary 
+    permissions to write to the table or partition and create partitions in
+    the table.</li>
+  <li> When issuing MapReduce queries on streaming tables, the user must set
+    <b>hive.input.format</b> to 
+    <b>org.apache.hadoop.hive.ql.io.HiveInputFormat</b></li>
+  <li> Settings required in hive-site.xml:
+    <ol>
+      <li><b>hive.txn.manager = 
+	org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</b></li>
+      <li><b>hive.compactor.initiator.on = true</b></li>
+      <li><b>hive.compactor.worker.threads</b> > 0</li>
+    </ol></li>
+</ol></p>
+
+<p>
+<b>Note:</b> Streaming to <b>unpartitioned</b> tables is also
+supported.</p>
+
+<h2>Transaction and Connection management</h2>
+
+<p>
+The class <a href="HiveEndPoint.html"><b>HiveEndPoint</b></a> is a Hive end
+point to connect to. An endpoint is either a Hive table or
+partition. An endpoint is cheap to create and does not internally hold
+on to any network connections.  Invoking the newConnection method on
+it creates a new connection to the Hive MetaStore for streaming
+purposes.  It returns a
+<a href="StreamingConnection.html"><b>StreamingConnection</b></a>
+object. Multiple connections can be established on the same
+endpoint. StreamingConnection can then be used to initiate new
+transactions for performing I/O. </p>
+
+<h3>Dynamic Partition Creation:</h3> It is very likely that a setup in
+which data is being streamed continuously (e.g. Flume), it is
+desirable to have new partitions created automatically (say on a
+hourly basis). In such cases requiring the Hive admin to pre-create
+the necessary partitions may not be reasonable.  Consequently the
+streaming API allows streaming clients to create partitions as
+needed. <b>HiveEndPoind.newConnection()</b> accepts a argument to
+indicate if the partition should be auto created. Partition creation
+being an atomic action, multiple clients can race to create the
+partition, but only one would succeed, so streaming clients need not
+synchronize when creating a partition. The user of the client process
+needs to be given write permissions on the Hive table in order to
+create partitions.
+
+<h3>Batching Transactions:</h3> Transactions are implemented slightly
+differently than traditional database systems. Multiple transactions
+are grouped into a <i>Transaction Batch</i> and each transaction has
+an id. Data from each transaction batch gets a single file on HDFS,
+which eventually gets compacted with other files into a larger file
+automatically for efficiency.
+
+<h3>Basic Steps:</h3> After connection is established, a streaming
+client first requests for a new batch of transactions. In response it
+receives a set of transaction ids that are part of the transaction
+batch. Subsequently the client proceeds to consume one transaction at
+a time by initiating new transactions. Client will write() one or more
+records per transactions and either commit or abort the current
+transaction before switching to the next one. Each
+<b>TransactionBatch.write()</b> invocation automatically associates
+the I/O attempt with the current transaction id. The user of the
+streaming client needs to have write permissions to the partition or
+table.</p>
+
+<p>
+<b>Concurrency Note:</b> I/O can be performed on multiple
+<b>TransactionBatch</b>s concurrently. However the transactions within a
+transaction batch much be consumed sequentially.</p>
+
+<h2>Writing Data</h2>
+
+<p>
+These classes and interfaces provide support for writing the data to
+Hive within a transaction. 
+<a href="RecordWriter.html"><b>RecordWriter</b></a> is the interface
+implemented by all writers. A writer is responsible for taking a
+record in the form of a <b>byte[]</b> containing data in a known
+format (e.g. CSV) and writing it out in the format supported by Hive
+streaming. A <b>RecordWriter</b> may reorder or drop fields from the incoming
+record if necessary to map them to the corresponding columns in the
+Hive Table. A streaming client will instantiate an appropriate
+<b>RecordWriter</b> type and pass it to 
+<b>StreamingConnection.fetchTransactionBatch()</b>.  The streaming client 
+does not directly interact with the <b>RecordWriter</b> therafter, but 
+relies on the <b>TransactionBatch</b> to do so.</p>
+
+<p>
+Currently, out of the box, the streaming API provides two
+implementations of the <b>RecordWriter</b> interface. One handles delimited
+input data (such as CSV, tab separated, etc.  and the other for JSON
+(strict syntax). Support for other input formats can be provided by
+additional implementations of the <b>RecordWriter</b> interface.
+<ul>
+<li> <a href="DelimitedInputWriter.html"><b>DelimitedInputWriter</b></a> 
+- Delimited text input.</li>
+<li> <a href="StrictJsonWriter.html"><b>StrictJsonWriter</b></a> 
+- JSON text input.</li>
+</ul></p>
+
+</body>
+
+</html>

Added: hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.hcatalog.streaming;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.Parser;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.LogUtils;
+import org.apache.hadoop.util.StringUtils;
+
+import java.util.Arrays;
+import java.util.Random;
+
+/**
+ * A stand alone utility to write data into the streaming ingest interface.
+ */
+public class StreamingIntegrationTester {
+
+  static final private Log LOG = LogFactory.getLog(StreamingIntegrationTester.class.getName());
+
+  public static void main(String[] args) {
+
+    try {
+      LogUtils.initHiveLog4j();
+    } catch (LogUtils.LogInitializationException e) {
+      System.err.println("Unable to initialize log4j " + StringUtils.stringifyException(e));
+      System.exit(-1);
+    }
+
+    Options options = new Options();
+
+    options.addOption(OptionBuilder
+      .hasArg()
+      .withArgName("abort-pct")
+      .withDescription("Percentage of transactions to abort, defaults to 5")
+      .withLongOpt("abortpct")
+      .create('a'));
+
+    options.addOption(OptionBuilder
+      .hasArgs()
+      .withArgName("column-names")
+      .withDescription("column names of table to write to")
+      .withLongOpt("columns")
+      .withValueSeparator(',')
+      .isRequired()
+      .create('c'));
+
+    options.addOption(OptionBuilder
+      .hasArg()
+      .withArgName("database")
+      .withDescription("Database of table to write to")
+      .withLongOpt("database")
+      .isRequired()
+      .create('d'));
+
+    options.addOption(OptionBuilder
+      .hasArg()
+      .withArgName("frequency")
+      .withDescription("How often to commit a transaction, in seconds, defaults to 1")
+      .withLongOpt("frequency")
+      .create('f'));
+
+    options.addOption(OptionBuilder
+      .hasArg()
+      .withArgName("iterations")
+      .withDescription("Number of batches to write, defaults to 10")
+      .withLongOpt("num-batches")
+      .create('i'));
+
+    options.addOption(OptionBuilder
+        .hasArg()
+        .withArgName("metastore-uri")
+        .withDescription("URI of Hive metastore")
+        .withLongOpt("metastore-uri")
+        .isRequired()
+        .create('m'));
+
+    options.addOption(OptionBuilder
+        .hasArg()
+        .withArgName("num_transactions")
+        .withDescription("Number of transactions per batch, defaults to 100")
+        .withLongOpt("num-txns")
+        .create('n'));
+
+    options.addOption(OptionBuilder
+         .hasArgs()
+         .withArgName("partition-values")
+         .withDescription("partition values, must be provided in order of partition columns, " +
+                 "if not provided table is assumed to not be partitioned")
+         .withLongOpt("partition")
+         .withValueSeparator(',')
+         .create('p'));
+
+    options.addOption(OptionBuilder
+         .hasArg()
+         .withArgName("records-per-transaction")
+         .withDescription("records to write in each transaction, defaults to 100")
+         .withLongOpt("records-per-txn")
+         .withValueSeparator(',')
+         .create('r'));
+
+    options.addOption(OptionBuilder
+         .hasArgs()
+         .withArgName("column-types")
+         .withDescription("column types, valid values are string, int, float, decimal, date, " +
+             "datetime")
+         .withLongOpt("schema")
+         .withValueSeparator(',')
+         .isRequired()
+         .create('s'));
+
+    options.addOption(OptionBuilder
+      .hasArg()
+      .withArgName("table")
+      .withDescription("Table to write to")
+      .withLongOpt("table")
+      .isRequired()
+      .create('t'));
+
+    options.addOption(OptionBuilder
+      .hasArg()
+      .withArgName("num-writers")
+      .withDescription("Number of writers to create, defaults to 2")
+      .withLongOpt("writers")
+      .create('w'));
+
+    options.addOption(OptionBuilder
+            .hasArg(false)
+            .withArgName("pause")
+            .withDescription("Wait on keyboard input after commit & batch close. default: disabled")
+            .withLongOpt("pause")
+            .create('x'));
+
+
+    Parser parser = new GnuParser();
+    CommandLine cmdline = null;
+    try {
+      cmdline = parser.parse(options, args);
+    } catch (ParseException e) {
+      System.err.println(e.getMessage());
+      usage(options);
+    }
+
+    boolean pause = cmdline.hasOption('x');
+    String db = cmdline.getOptionValue('d');
+    String table = cmdline.getOptionValue('t');
+    String uri = cmdline.getOptionValue('m');
+    int txnsPerBatch = Integer.valueOf(cmdline.getOptionValue('n', "100"));
+    int writers = Integer.valueOf(cmdline.getOptionValue('w', "2"));
+    int batches = Integer.valueOf(cmdline.getOptionValue('i', "10"));
+    int recordsPerTxn = Integer.valueOf(cmdline.getOptionValue('r', "100"));
+    int frequency = Integer.valueOf(cmdline.getOptionValue('f', "1"));
+    int ap = Integer.valueOf(cmdline.getOptionValue('a', "5"));
+    float abortPct = ((float)ap) / 100.0f;
+    String[] partVals = cmdline.getOptionValues('p');
+    String[] cols = cmdline.getOptionValues('c');
+    String[] types = cmdline.getOptionValues('s');
+
+    StreamingIntegrationTester sit = new StreamingIntegrationTester(db, table, uri,
+        txnsPerBatch, writers, batches, recordsPerTxn, frequency, abortPct, partVals, cols, types
+            , pause);
+    sit.go();
+  }
+
+  static void usage(Options options) {
+    HelpFormatter hf = new HelpFormatter();
+    hf.printHelp(HelpFormatter.DEFAULT_WIDTH, "sit [options]", "Usage: ", options, "");
+    System.exit(-1);
+  }
+
+  private String db;
+  private String table;
+  private String uri;
+  private int txnsPerBatch;
+  private int writers;
+  private int batches;
+  private int recordsPerTxn;
+  private int frequency;
+  private float abortPct;
+  private String[] partVals;
+  private String[] cols;
+  private String[] types;
+  private boolean pause;
+
+
+  private StreamingIntegrationTester(String db, String table, String uri, int txnsPerBatch,
+                                     int writers, int batches,  int recordsPerTxn,
+                                     int frequency, float abortPct, String[] partVals,
+                                     String[] cols, String[] types, boolean pause) {
+    this.db = db;
+    this.table = table;
+    this.uri = uri;
+    this.txnsPerBatch = txnsPerBatch;
+    this.writers = writers;
+    this.batches = batches;
+    this.recordsPerTxn = recordsPerTxn;
+    this.frequency = frequency;
+    this.abortPct = abortPct;
+    this.partVals = partVals;
+    this.cols = cols;
+    this.types = types;
+    this.pause = pause;
+  }
+
+  private void go() {
+    HiveEndPoint endPoint = null;
+    try {
+      if (partVals == null) {
+        endPoint = new HiveEndPoint(uri, db, table, null);
+      } else {
+        endPoint = new HiveEndPoint(uri, db, table, Arrays.asList(partVals));
+      }
+
+      for (int i = 0; i < writers; i++) {
+        Writer w = new Writer(endPoint, i, txnsPerBatch, batches, recordsPerTxn, frequency, abortPct,
+                cols, types, pause);
+        w.start();
+      }
+
+    } catch (Throwable t) {
+      System.err.println("Caught exception while testing: " + StringUtils.stringifyException(t));
+    }
+  }
+
+  private static class Writer extends Thread {
+    private HiveEndPoint endPoint;
+    private int txnsPerBatch;
+    private int batches;
+    private int writerNumber;
+    private int recordsPerTxn;
+    private int frequency;
+    private float abortPct;
+    private String[] cols;
+    private String[] types;
+    private boolean pause;
+    private Random rand;
+
+    Writer(HiveEndPoint endPoint, int writerNumber, int txnsPerBatch, int batches,
+           int recordsPerTxn, int frequency, float abortPct, String[] cols, String[] types
+            , boolean pause) {
+      this.endPoint = endPoint;
+      this.txnsPerBatch = txnsPerBatch;
+      this.batches = batches;
+      this.writerNumber = writerNumber;
+      this.recordsPerTxn = recordsPerTxn;
+      this.frequency = frequency;
+      this.abortPct = abortPct;
+      this.cols = cols;
+      this.types = types;
+      this.pause = pause;
+      rand = new Random();
+    }
+
+    @Override
+    public void run() {
+      StreamingConnection conn = null;
+      try {
+        conn = endPoint.newConnection(true);
+        RecordWriter writer = new DelimitedInputWriter(cols, ",", endPoint);
+
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < batches; i++) {
+          LOG.info("Starting batch " + i);
+          TransactionBatch batch = conn.fetchTransactionBatch(txnsPerBatch, writer);
+          try {
+            while (batch.remainingTransactions() > 0) {
+              batch.beginNextTransaction();
+              for (int j = 0; j < recordsPerTxn; j++) {
+                batch.write(generateRecord(cols, types));
+              }
+              if (rand.nextFloat() < abortPct) batch.abort();
+              else
+              batch.commit();
+              if (pause) {
+                System.out.println("Writer " + writerNumber +
+                        " committed... press Enter to continue. " + Thread.currentThread().getId());
+                System.in.read();
+              }
+            }
+            long end = System.currentTimeMillis();
+            if (end - start < frequency) Thread.sleep(frequency - (end - start));
+          } finally {
+            batch.close();
+            if (pause) {
+              System.out.println("Writer " + writerNumber +
+                  " has closed a Batch.. press Enter to continue. " + Thread.currentThread().getId());
+              System.in.read();
+            }
+          }
+        }
+      } catch (Throwable t) {
+       System.err.println("Writer number " + writerNumber
+               + " caught exception while testing: " + StringUtils.stringifyException(t));
+      } finally {
+        if (conn!=null) conn.close();
+      }
+    }
+
+    private byte[] generateRecord(String[] cols, String[] types) {
+      // TODO make it so I can randomize the column order
+
+      StringBuilder buf = new StringBuilder();
+      for (int i = 0; i < types.length; i++) {
+        buf.append(generateColumn(types[i]));
+        buf.append(",");
+      }
+      return buf.toString().getBytes();
+    }
+
+    private String generateColumn(String type) {
+      if ("string".equals(type.toLowerCase())) {
+        return  "When that Aprilis with his showers swoot";
+      } else if (type.toLowerCase().startsWith("int")) {
+        return "42";
+      } else if (type.toLowerCase().startsWith("dec") || type.toLowerCase().equals("float")) {
+        return "3.141592654";
+      } else if (type.toLowerCase().equals("datetime")) {
+        return "2014-03-07 15:33:22";
+      } else if (type.toLowerCase().equals("date")) {
+        return "1955-11-12";
+      } else {
+        throw new RuntimeException("Sorry, I don't know the type " + type);
+      }
+    }
+  }
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestDelimitedInputWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestDelimitedInputWriter.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestDelimitedInputWriter.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestDelimitedInputWriter.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+import com.google.common.collect.Lists;
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+public class TestDelimitedInputWriter {
+  @Test
+  public void testFieldReordering() throws Exception {
+
+    ArrayList<String> colNames = Lists.newArrayList(new String[]{"col1", "col2", "col3", "col4", "col5"});
+    {//1)  test dropping fields - first middle  & last
+      String[] fieldNames = {null, "col2", null, "col4", null};
+      int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+      Assert.assertTrue(Arrays.equals(mapping, new int[]{-1, 1, -1, 3, -1}));
+    }
+
+    {//2)  test reordering
+      String[] fieldNames = {"col5", "col4", "col3", "col2", "col1"};
+      int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+      Assert.assertTrue( Arrays.equals(mapping, new int[]{4,3,2,1,0}) );
+    }
+
+    {//3)  test bad field names
+      String[] fieldNames = {"xyz", "abc", "col3", "col4", "as"};
+      try {
+        DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+        Assert.fail();
+      } catch (InvalidColumn e) {
+        // should throw
+      }
+    }
+
+    {//4)  test few field names
+      String[] fieldNames = {"col3", "col4"};
+      int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+      Assert.assertTrue( Arrays.equals(mapping, new int[]{2,3}) );
+    }
+
+    {//5)  test extra field names
+      String[] fieldNames = {"col5", "col4", "col3", "col2", "col1", "col1"};
+      try {
+      DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+      Assert.fail();
+      } catch (InvalidColumn e) {
+        //show throw
+      }
+    }
+  }
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,800 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+import junit.framework.Assert;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.thrift.TException;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class TestStreaming {
+
+  public static class RawFileSystem extends RawLocalFileSystem {
+    private static final URI NAME;
+    static {
+      try {
+        NAME = new URI("raw:///");
+      } catch (URISyntaxException se) {
+        throw new IllegalArgumentException("bad uri", se);
+      }
+    }
+
+    @Override
+    public URI getUri() {
+      return NAME;
+    }
+
+
+    @Override
+    public FileStatus getFileStatus(Path path) throws IOException {
+      File file = pathToFile(path);
+      if (!file.exists()) {
+        throw new FileNotFoundException("Can't find " + path);
+      }
+      // get close enough
+      short mod = 0;
+      if (file.canRead()) {
+        mod |= 0444;
+      }
+      if (file.canWrite()) {
+        mod |= 0200;
+      }
+      if (file.canExecute()) {
+        mod |= 0111;
+      }
+      return new FileStatus(file.length(), file.isDirectory(), 1, 1024,
+          file.lastModified(), file.lastModified(),
+          FsPermission.createImmutable(mod), "owen", "users", path);
+    }
+  }
+
+  private static final String COL1 = "id";
+  private static final String COL2 = "msg";
+
+  private final HiveConf conf;
+  private final IMetaStoreClient msClient;
+
+  final String metaStoreURI = null;
+
+  // partitioned table
+  private final static String dbName = "testing";
+  private final static String tblName = "alerts";
+  private final static String[] fieldNames = new String[]{COL1,COL2};
+  List<String> partitionVals;
+  private static String partLocation;
+
+  // unpartitioned table
+  private final static String dbName2 = "testing";
+  private final static String tblName2 = "alerts";
+  private final static String[] fieldNames2 = new String[]{COL1,COL2};
+
+  private final String PART1_CONTINENT = "Asia";
+  private final String PART1_COUNTRY = "India";
+
+  @Rule
+  public TemporaryFolder dbFolder = new TemporaryFolder();
+
+
+  public TestStreaming() throws Exception {
+    partitionVals = new ArrayList<String>(2);
+    partitionVals.add(PART1_CONTINENT);
+    partitionVals.add(PART1_COUNTRY);
+
+    conf = new HiveConf(this.getClass());
+    conf.set("fs.raw.impl", RawFileSystem.class.getName());
+    TxnDbUtil.setConfValues(conf);
+    if (metaStoreURI!=null) {
+      conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
+    }
+    conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+
+    //1) Start from a clean slate (metastore)
+    TxnDbUtil.cleanDb();
+    TxnDbUtil.prepDb();
+
+    //2) obtain metastore clients
+    msClient = new HiveMetaStoreClient(conf);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    // drop and recreate the necessary databases and tables
+    dropDB(msClient, dbName);
+    createDbAndTable(msClient, dbName, tblName, partitionVals);
+
+    dropDB(msClient, dbName2);
+    createDbAndTable(msClient, dbName2, tblName2, partitionVals);
+  }
+
+  private static List<FieldSchema> getPartitionKeys() {
+    List<FieldSchema> fields = new ArrayList<FieldSchema>();
+    // Defining partition names in unsorted order
+    fields.add(new FieldSchema("continent", serdeConstants.STRING_TYPE_NAME, ""));
+    fields.add(new FieldSchema("country", serdeConstants.STRING_TYPE_NAME, ""));
+    return fields;
+  }
+
+  private void checkDataWritten(long minTxn, long maxTxn, int buckets, int numExpectedFiles,
+                                String... records) throws Exception {
+    ValidTxnList txns = msClient.getValidTxns();
+    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(partLocation), conf, txns);
+    Assert.assertEquals(0, dir.getObsolete().size());
+    Assert.assertEquals(0, dir.getOriginalFiles().size());
+    List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
+    System.out.println("Files found: ");
+    for (AcidUtils.ParsedDelta pd : current) System.out.println(pd.getPath().toString());
+    Assert.assertEquals(numExpectedFiles, current.size());
+
+    // find the absolute mininum transaction
+    long min = Long.MAX_VALUE;
+    long max = Long.MIN_VALUE;
+    for (AcidUtils.ParsedDelta pd : current) {
+      if (pd.getMaxTransaction() > max) max = pd.getMaxTransaction();
+      if (pd.getMinTransaction() < min) min = pd.getMinTransaction();
+    }
+    Assert.assertEquals(minTxn, min);
+    Assert.assertEquals(maxTxn, max);
+
+    InputFormat inf = new OrcInputFormat();
+    JobConf job = new JobConf();
+    job.set("mapred.input.dir", partLocation.toString());
+    job.set("bucket_count", Integer.toString(buckets));
+    job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
+    InputSplit[] splits = inf.getSplits(job, 1);
+    Assert.assertEquals(1, splits.length);
+    org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
+            inf.getRecordReader(splits[0], job, Reporter.NULL);
+
+    NullWritable key = rr.createKey();
+    OrcStruct value = rr.createValue();
+    for (int i = 0; i < records.length; i++) {
+      Assert.assertEquals(true, rr.next(key, value));
+      Assert.assertEquals(records[i], value.toString());
+    }
+    Assert.assertEquals(false, rr.next(key, value));
+  }
+
+  private void checkNothingWritten() throws Exception {
+    ValidTxnList txns = msClient.getValidTxns();
+    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(partLocation), conf, txns);
+    Assert.assertEquals(0, dir.getObsolete().size());
+    Assert.assertEquals(0, dir.getOriginalFiles().size());
+    List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
+    Assert.assertEquals(0, current.size());
+  }
+
+  @Test
+  public void testEndpointConnection() throws Exception {
+    // 1) Basic
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
+            , partitionVals);
+    StreamingConnection connection = endPt.newConnection(false, null); //shouldn't throw
+    connection.close();
+
+    // 2) Leave partition unspecified
+    endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null);
+    endPt.newConnection(false, null).close(); // should not throw
+  }
+
+  @Test
+  public void testAddPartition() throws Exception {
+    List<String> newPartVals = new ArrayList<String>(2);
+    newPartVals.add(PART1_CONTINENT);
+    newPartVals.add("Nepal");
+
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
+            , newPartVals);
+
+    // Ensure partition is absent
+    try {
+      msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals);
+      Assert.assertTrue("Partition already exists", false);
+    } catch (NoSuchObjectException e) {
+      // expect this exception
+    }
+
+    // Create partition
+    Assert.assertNotNull(endPt.newConnection(true, null));
+
+    // Ensure partition is present
+    Partition p = msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals);
+    Assert.assertNotNull("Did not find added partition", p);
+  }
+
+  @Test
+  public void testTransactionBatchEmptyCommit() throws Exception {
+    // 1)  to partitioned table
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+    StreamingConnection connection = endPt.newConnection(false, null);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+
+    txnBatch.beginNextTransaction();
+    txnBatch.commit();
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+            , txnBatch.getCurrentTransactionState());
+    txnBatch.close();
+    connection.close();
+
+    // 2) To unpartitioned table
+    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+    writer = new DelimitedInputWriter(fieldNames2,",", endPt);
+    connection = endPt.newConnection(false, null);
+
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.commit();
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+            , txnBatch.getCurrentTransactionState());
+    txnBatch.close();
+    connection.close();
+  }
+
+  @Test
+  public void testTransactionBatchEmptyAbort() throws Exception {
+    // 1) to partitioned table
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+    StreamingConnection connection = endPt.newConnection(true);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.abort();
+    Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+            , txnBatch.getCurrentTransactionState());
+    txnBatch.close();
+    connection.close();
+
+    // 2) to unpartitioned table
+    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+    writer = new DelimitedInputWriter(fieldNames,",", endPt);
+    connection = endPt.newConnection(true);
+
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.abort();
+    Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+            , txnBatch.getCurrentTransactionState());
+    txnBatch.close();
+    connection.close();
+  }
+
+  @Test
+  public void testTransactionBatchCommit_Delimited() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+    StreamingConnection connection = endPt.newConnection(true);
+
+    // 1st Txn
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+            , txnBatch.getCurrentTransactionState());
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.commit();
+
+    checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+            , txnBatch.getCurrentTransactionState());
+
+    // 2nd Txn
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+            , txnBatch.getCurrentTransactionState());
+    txnBatch.write("2,Welcome to streaming".getBytes());
+
+    // data should not be visible
+    checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+
+    txnBatch.commit();
+
+    checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}",
+        "{2, Welcome to streaming}");
+
+    txnBatch.close();
+    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+            , txnBatch.getCurrentTransactionState());
+
+
+    connection.close();
+
+
+    // To Unpartitioned table
+    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+    writer = new DelimitedInputWriter(fieldNames,",", endPt);
+    connection = endPt.newConnection(true);
+
+    // 1st Txn
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+            , txnBatch.getCurrentTransactionState());
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.commit();
+
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+            , txnBatch.getCurrentTransactionState());
+    connection.close();
+  }
+
+  @Test
+  public void testTransactionBatchCommit_Json() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    StrictJsonWriter writer = new StrictJsonWriter(endPt);
+    StreamingConnection connection = endPt.newConnection(true);
+
+    // 1st Txn
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+            , txnBatch.getCurrentTransactionState());
+    String rec1 = "{\"id\" : 1, \"msg\": \"Hello streaming\"}";
+    txnBatch.write(rec1.getBytes());
+    txnBatch.commit();
+
+    checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+            , txnBatch.getCurrentTransactionState());
+
+    txnBatch.close();
+    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+            , txnBatch.getCurrentTransactionState());
+
+    connection.close();
+  }
+
+  @Test
+  public void testRemainingTransactions() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+    StreamingConnection connection = endPt.newConnection(true);
+
+    // 1) test with txn.Commit()
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    int batch=0;
+    int initialCount = txnBatch.remainingTransactions();
+    while (txnBatch.remainingTransactions()>0) {
+      txnBatch.beginNextTransaction();
+      Assert.assertEquals(--initialCount, txnBatch.remainingTransactions());
+      for (int rec=0; rec<2; ++rec) {
+        Assert.assertEquals(TransactionBatch.TxnState.OPEN
+                , txnBatch.getCurrentTransactionState());
+        txnBatch.write((batch * rec + ",Hello streaming").getBytes());
+      }
+      txnBatch.commit();
+      Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+              , txnBatch.getCurrentTransactionState());
+      ++batch;
+    }
+    Assert.assertEquals(0, txnBatch.remainingTransactions());
+    txnBatch.close();
+
+    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+            , txnBatch.getCurrentTransactionState());
+
+    // 2) test with txn.Abort()
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    batch=0;
+    initialCount = txnBatch.remainingTransactions();
+    while (txnBatch.remainingTransactions()>0) {
+      txnBatch.beginNextTransaction();
+      Assert.assertEquals(--initialCount,txnBatch.remainingTransactions());
+      for (int rec=0; rec<2; ++rec) {
+        Assert.assertEquals(TransactionBatch.TxnState.OPEN
+                , txnBatch.getCurrentTransactionState());
+        txnBatch.write((batch * rec + ",Hello streaming").getBytes());
+      }
+      txnBatch.abort();
+      Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+              , txnBatch.getCurrentTransactionState());
+      ++batch;
+    }
+    Assert.assertEquals(0,txnBatch.remainingTransactions());
+    txnBatch.close();
+
+    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+            , txnBatch.getCurrentTransactionState());
+
+    connection.close();
+  }
+
+  @Test
+  public void testTransactionBatchAbort() throws Exception {
+
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+    StreamingConnection connection = endPt.newConnection(false);
+
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.write("2,Welcome to streaming".getBytes());
+    txnBatch.abort();
+
+    checkNothingWritten();
+
+    Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+            , txnBatch.getCurrentTransactionState());
+
+    txnBatch.close();
+    connection.close();
+
+    checkNothingWritten();
+
+  }
+
+
+  @Test
+  public void testTransactionBatchAbortAndCommit() throws Exception {
+
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+    StreamingConnection connection = endPt.newConnection(false);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.write("2,Welcome to streaming".getBytes());
+    txnBatch.abort();
+
+    checkNothingWritten();
+
+    Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+            , txnBatch.getCurrentTransactionState());
+
+    txnBatch.beginNextTransaction();
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.write("2,Welcome to streaming".getBytes());
+    txnBatch.commit();
+
+    checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}",
+        "{2, Welcome to streaming}");
+
+    txnBatch.close();
+    connection.close();
+  }
+
+  @Test
+  public void testMultipleTransactionBatchCommits() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+    StreamingConnection connection = endPt.newConnection(true);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.commit();
+
+    checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+
+    txnBatch.beginNextTransaction();
+    txnBatch.write("2,Welcome to streaming".getBytes());
+    txnBatch.commit();
+
+    checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}",
+        "{2, Welcome to streaming}");
+
+    txnBatch.close();
+
+    // 2nd Txn Batch
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("3,Hello streaming - once again".getBytes());
+    txnBatch.commit();
+
+    checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
+        "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
+
+    txnBatch.beginNextTransaction();
+    txnBatch.write("4,Welcome to streaming - once again".getBytes());
+    txnBatch.commit();
+
+    checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
+        "{2, Welcome to streaming}", "{3, Hello streaming - once again}",
+        "{4, Welcome to streaming - once again}");
+
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+            , txnBatch.getCurrentTransactionState());
+
+    txnBatch.close();
+
+    connection.close();
+  }
+
+
+  @Test
+  public void testInterleavedTransactionBatchCommits() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt);
+    StreamingConnection connection = endPt.newConnection(false);
+
+    // Acquire 1st Txn Batch
+    TransactionBatch txnBatch1 =  connection.fetchTransactionBatch(10, writer);
+    txnBatch1.beginNextTransaction();
+
+    // Acquire 2nd Txn Batch
+    DelimitedInputWriter writer2 = new DelimitedInputWriter(fieldNames, ",", endPt);
+    TransactionBatch txnBatch2 =  connection.fetchTransactionBatch(10, writer2);
+    txnBatch2.beginNextTransaction();
+
+    // Interleaved writes to both batches
+    txnBatch1.write("1,Hello streaming".getBytes());
+    txnBatch2.write("3,Hello streaming - once again".getBytes());
+
+    checkNothingWritten();
+
+    txnBatch2.commit();
+
+    checkDataWritten(11, 20, 1, 1, "{3, Hello streaming - once again}");
+
+    txnBatch1.commit();
+
+    checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+
+    txnBatch1.beginNextTransaction();
+    txnBatch1.write("2,Welcome to streaming".getBytes());
+
+    txnBatch2.beginNextTransaction();
+    txnBatch2.write("4,Welcome to streaming - once again".getBytes());
+
+    checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+
+    txnBatch1.commit();
+
+    checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
+        "{2, Welcome to streaming}",
+        "{3, Hello streaming - once again}");
+
+    txnBatch2.commit();
+
+    checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
+        "{2, Welcome to streaming}",
+        "{3, Hello streaming - once again}",
+        "{4, Welcome to streaming - once again}");
+
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+            , txnBatch1.getCurrentTransactionState());
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+            , txnBatch2.getCurrentTransactionState());
+
+    txnBatch1.close();
+    txnBatch2.close();
+
+    connection.close();
+  }
+
+  class WriterThd extends Thread {
+
+    private StreamingConnection conn;
+    private HiveEndPoint ep;
+    private DelimitedInputWriter writer;
+    private String data;
+
+    WriterThd(HiveEndPoint ep, String data) throws Exception {
+      this.ep = ep;
+      writer = new DelimitedInputWriter(fieldNames, ",", ep);
+      conn = ep.newConnection(false);
+      this.data = data;
+    }
+
+    @Override
+    public void run() {
+      TransactionBatch txnBatch = null;
+      try {
+        txnBatch =  conn.fetchTransactionBatch(1000, writer);
+        while (txnBatch.remainingTransactions() > 0) {
+          txnBatch.beginNextTransaction();
+          txnBatch.write(data.getBytes());
+          txnBatch.write(data.getBytes());
+          txnBatch.commit();
+        } // while
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      } finally {
+        if (txnBatch != null) {
+          try {
+            txnBatch.close();
+          } catch (Exception e) {
+            conn.close();
+            throw new RuntimeException(e);
+          }
+        }
+        try {
+          conn.close();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+
+      }
+    }
+  }
+
+  @Test
+  public void testConcurrentTransactionBatchCommits() throws Exception {
+    final HiveEndPoint ep = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals);
+    WriterThd t1 = new WriterThd(ep, "1,Matrix");
+    WriterThd t2 = new WriterThd(ep, "2,Gandhi");
+    WriterThd t3 = new WriterThd(ep, "3,Silence");
+
+    t1.start();
+    t2.start();
+    t3.start();
+
+    t1.join();
+    t2.join();
+    t3.join();
+
+  }
+
+  // delete db and all tables in it
+  public static void dropDB(IMetaStoreClient client, String databaseName) {
+    try {
+      for (String table : client.listTableNamesByFilter(databaseName, "", (short)-1)) {
+        client.dropTable(databaseName, table, true, true);
+      }
+      client.dropDatabase(databaseName);
+    } catch (TException e) {
+    }
+
+  }
+
+  public void createDbAndTable(IMetaStoreClient client, String databaseName,
+                               String tableName, List<String> partVals)
+          throws Exception {
+    Database db = new Database();
+    db.setName(databaseName);
+    String dbLocation = "raw://" + dbFolder.newFolder(databaseName + ".db").getCanonicalPath();
+    db.setLocationUri(dbLocation);
+    client.createDatabase(db);
+
+    Table tbl = new Table();
+    tbl.setDbName(databaseName);
+    tbl.setTableName(tableName);
+    tbl.setTableType(TableType.MANAGED_TABLE.toString());
+    StorageDescriptor sd = new StorageDescriptor();
+    sd.setCols(getTableColumns());
+    sd.setNumBuckets(1);
+    sd.setLocation(dbLocation + Path.SEPARATOR + tableName);
+    tbl.setPartitionKeys(getPartitionKeys());
+
+    tbl.setSd(sd);
+
+    sd.setBucketCols(new ArrayList<String>(2));
+    sd.setSerdeInfo(new SerDeInfo());
+    sd.getSerdeInfo().setName(tbl.getTableName());
+    sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+    sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+
+    sd.getSerdeInfo().setSerializationLib(OrcSerde.class.getName());
+    sd.setInputFormat(HiveInputFormat.class.getName());
+    sd.setOutputFormat(OrcOutputFormat.class.getName());
+
+    Map<String, String> tableParams = new HashMap<String, String>();
+    tbl.setParameters(tableParams);
+    client.createTable(tbl);
+
+    try {
+      addPartition(client, tbl, partVals);
+    } catch (AlreadyExistsException e) {
+    }
+    Partition createdPartition = client.getPartition(databaseName, tableName, partVals);
+    partLocation = createdPartition.getSd().getLocation();
+  }
+
+  private static void addPartition(IMetaStoreClient client, Table tbl
+          , List<String> partValues)
+          throws IOException, TException {
+    Partition part = new Partition();
+    part.setDbName(tbl.getDbName());
+    part.setTableName(tblName);
+    StorageDescriptor sd = new StorageDescriptor(tbl.getSd());
+    sd.setLocation(sd.getLocation() + Path.SEPARATOR + makePartPath(tbl.getPartitionKeys()
+            , partValues));
+    part.setSd(sd);
+    part.setValues(partValues);
+    client.add_partition(part);
+  }
+
+  private static String makePartPath(List<FieldSchema> partKeys, List<String> partVals) {
+    if (partKeys.size()!=partVals.size()) {
+      throw new IllegalArgumentException("Partition values:" + partVals
+              + ", does not match the partition Keys in table :" + partKeys );
+    }
+    StringBuffer buff = new StringBuffer(partKeys.size()*20);
+    buff.append(" ( ");
+    int i=0;
+    for (FieldSchema schema : partKeys) {
+      buff.append(schema.getName());
+      buff.append("='");
+      buff.append(partVals.get(i));
+      buff.append("'");
+      if (i!=partKeys.size()-1) {
+        buff.append(Path.SEPARATOR);
+      }
+      ++i;
+    }
+    buff.append(" )");
+    return buff.toString();
+  }
+
+
+  private static List<FieldSchema> getTableColumns() {
+    List<FieldSchema> fields = new ArrayList<FieldSchema>();
+    fields.add(new FieldSchema(COL1, serdeConstants.INT_TYPE_NAME, ""));
+    fields.add(new FieldSchema(COL2, serdeConstants.STRING_TYPE_NAME, ""));
+    return fields;
+  }
+}

Added: hive/branches/branch-0.13/hcatalog/streaming/src/test/sit
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/test/sit?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/test/sit (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/test/sit Thu Apr 10 01:08:59 2014
@@ -0,0 +1,39 @@
+#!/bin/sh
+
+if [ "${HADOOP_HOME}x" == "x" ] 
+  then
+  echo "Please set HADOOP_HOME";
+  exit 1
+fi
+
+if [ "${HIVE_HOME}x" == "x" ] 
+  then
+  echo "Please set HIVE_HOME";
+  exit 1
+fi
+
+if [ "${JAVA_HOME}x" == "x" ] 
+  then
+  echo "Please set JAVA_HOME";
+  exit 1
+fi
+
+for jar in ${HADOOP_HOME}/client/*.jar
+  do
+  CLASSPATH=${CLASSPATH}:$jar
+done
+
+for jar in ${HIVE_HOME}/lib/*.jar
+  do
+  CLASSPATH=${CLASSPATH}:$jar
+done
+
+for jar in ${HIVE_HOME}/hcatalog/share/hcatalog/*.jar
+  do
+  CLASSPATH=${CLASSPATH}:$jar
+done
+
+CLASSPATH=${CLASSPATH}:${HADOOP_HOME}/conf
+CLASSPATH=${CLASSPATH}:${HIVE_HOME}/conf
+
+$JAVA_HOME/bin/java -cp ${CLASSPATH} org.apache.hive.hcatalog.streaming.StreamingIntegrationTester $@

Modified: hive/branches/branch-0.13/packaging/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/packaging/pom.xml?rev=1586190&r1=1586189&r2=1586190&view=diff
==============================================================================
--- hive/branches/branch-0.13/packaging/pom.xml (original)
+++ hive/branches/branch-0.13/packaging/pom.xml Thu Apr 10 01:08:59 2014
@@ -133,6 +133,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-streaming</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
       <artifactId>hive-hcatalog-hbase-storage-handler</artifactId>
       <version>${project.version}</version>
     </dependency>

Modified: hive/branches/branch-0.13/packaging/src/main/assembly/bin.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/packaging/src/main/assembly/bin.xml?rev=1586190&r1=1586189&r2=1586190&view=diff
==============================================================================
--- hive/branches/branch-0.13/packaging/src/main/assembly/bin.xml (original)
+++ hive/branches/branch-0.13/packaging/src/main/assembly/bin.xml Thu Apr 10 01:08:59 2014
@@ -52,6 +52,7 @@
         <include>org.apache.hive.hcatalog:hive-hcatalog-core</include>
         <include>org.apache.hive.hcatalog:hive-hcatalog-pig-adapter</include>
         <include>org.apache.hive.hcatalog:hive-hcatalog-server-extensions</include>
+        <include>org.apache.hive.hcatalog:hive-hcatalog-streaming</include>
       </includes>
     </dependencySet>
     <dependencySet>