You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2014/04/10 03:09:01 UTC
svn commit: r1586190 [2/2] - in /hive/branches/branch-0.13: hcatalog/
hcatalog/streaming/ hcatalog/streaming/src/ hcatalog/streaming/src/java/
hcatalog/streaming/src/java/org/ hcatalog/streaming/src/java/org/apache/
hcatalog/streaming/src/java/org/apac...
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.hcatalog.data.JsonSerDe;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Streaming Writer handles utf8 encoded Json (Strict syntax).
+ * Uses org.apache.hive.hcatalog.data.JsonSerDe to process Json input
+ */
+public class StrictJsonWriter extends AbstractRecordWriter {
+ private JsonSerDe serde;
+
+ /**
+ *
+ * @param endPoint the end point to write to
+ * @throws ConnectionError
+ * @throws SerializationError
+ * @throws StreamingException
+ */
+ public StrictJsonWriter(HiveEndPoint endPoint)
+ throws ConnectionError, SerializationError, StreamingException {
+ super(endPoint, null);
+ }
+
+ /**
+ *
+ * @param endPoint the end point to write to
+ * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
+ * @throws ConnectionError
+ * @throws SerializationError
+ * @throws StreamingException
+ */
+ public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf)
+ throws ConnectionError, SerializationError, StreamingException {
+ super(endPoint, conf);
+ }
+
+ @Override
+ SerDe getSerde() throws SerializationError {
+ if(serde!=null) {
+ return serde;
+ }
+ serde = createSerde(tbl, conf);
+ return serde;
+ }
+
+ @Override
+ public void write(long transactionId, byte[] record)
+ throws StreamingIOFailure, SerializationError {
+ try {
+ Object encodedRow = encode(record);
+ updater.insert(transactionId, encodedRow);
+ } catch (IOException e) {
+ throw new StreamingIOFailure("Error writing record in transaction("
+ + transactionId + ")", e);
+ }
+
+ }
+
+ /**
+ * Creates JsonSerDe
+ * @param tbl used to create serde
+ * @param conf used to create serde
+ * @return
+ * @throws SerializationError if serde could not be initialized
+ */
+ private static JsonSerDe createSerde(Table tbl, HiveConf conf)
+ throws SerializationError {
+ try {
+ Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+ JsonSerDe serde = new JsonSerDe();
+ serde.initialize(conf, tableProps);
+ return serde;
+ } catch (SerDeException e) {
+ throw new SerializationError("Error initializing serde " + JsonSerDe.class.getName(), e);
+ }
+ }
+
+ /**
+ * Encode Utf8 encoded string bytes using JsonSerde
+ * @param utf8StrRecord
+ * @return The encoded object
+ * @throws SerializationError
+ */
+ private Object encode(byte[] utf8StrRecord) throws SerializationError {
+ try {
+ Text blob = new Text(utf8StrRecord);
+ return serde.deserialize(blob);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to convert byte[] record into Object", e);
+ }
+ }
+
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+
+import java.util.Collection;
+
+/**
+ * Represents a set of Transactions returned by Hive. Supports opening, writing to
+ * and commiting/aborting each transaction. The interface is designed to ensure
+ * transactions in a batch are used up sequentially. Multiple transaction batches can be
+ * used (initialized with separate RecordWriters) for concurrent streaming
+ *
+ */
+public interface TransactionBatch {
+ public enum TxnState {INACTIVE, OPEN, COMMITTED, ABORTED }
+
+ /**
+ * Activate the next available transaction in the current transaction batch
+ * @throws StreamingException if not able to switch to next Txn
+ * @throws InterruptedException if call in interrupted
+ */
+ public void beginNextTransaction() throws StreamingException, InterruptedException;
+
+ /**
+ * Get Id of currently open transaction
+ * @return transaction id
+ */
+ public Long getCurrentTxnId();
+
+ /**
+ * get state of current transaction
+ */
+ public TxnState getCurrentTransactionState();
+
+ /**
+ * Commit the currently open transaction
+ * @throws StreamingException if there are errors committing
+ * @throws InterruptedException if call in interrupted
+ */
+ public void commit() throws StreamingException, InterruptedException;
+
+ /**
+ * Abort the currently open transaction
+ * @throws StreamingException if there are errors
+ * @throws InterruptedException if call in interrupted
+ */
+ public void abort() throws StreamingException, InterruptedException;
+
+ /**
+ * Remaining transactions are the ones that are not committed or aborted or open.
+ * Current open transaction is not considered part of remaining txns.
+ * @return number of transactions remaining this batch.
+ */
+ public int remainingTransactions();
+
+
+ /**
+ * Write record using RecordWriter
+ * @param record the data to be written
+ * @throws StreamingException if there are errors when writing
+ * @throws InterruptedException if call in interrupted
+ */
+ public void write(byte[] record) throws StreamingException, InterruptedException;
+
+ /**
+ * Write records using RecordWriter
+ * @throws StreamingException if there are errors when writing
+ * @throws InterruptedException if call in interrupted
+ */
+ public void write(Collection<byte[]> records) throws StreamingException, InterruptedException;
+
+
+ /**
+ * Issues a heartbeat to hive metastore on the current and remaining txn ids
+ * to keep them from expiring
+ * @throws StreamingException if there are errors
+ */
+ public void heartbeat() throws StreamingException;
+
+ /**
+ * Close the TransactionBatch
+ * @throws StreamingException if there are errors closing batch
+ * @throws InterruptedException if call in interrupted
+ */
+ public void close() throws StreamingException, InterruptedException;
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatchUnAvailable.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatchUnAvailable.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatchUnAvailable.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatchUnAvailable.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+public class TransactionBatchUnAvailable extends StreamingException {
+ public TransactionBatchUnAvailable(HiveEndPoint ep, Exception e) {
+ super("Unable to acquire transaction batch on end point: " + ep, e);
+ }
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+public class TransactionError extends StreamingException {
+ public TransactionError(String msg, Exception e) {
+ super(msg, e);
+ }
+
+ public TransactionError(String msg) {
+ super(msg);
+ }
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html Thu Apr 10 01:08:59 2014
@@ -0,0 +1,166 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
+ "http://www.w3.org/TR/html4/loose.dtd">
+
+<html lang="en">
+
+<head>
+<meta name=Title content="HCatalog Streaming API">
+<meta name=Keywords content="HCatalog Streaming ACID">
+<meta http-equiv=Content-Type content="text/html; charset=macintosh">
+<title>HCatalog Streaming API</title>
+</head>
+
+<body>
+
+<h1>HCatalog Streaming API -- high level description</h1>
+
+<b>NOTE: The Streaming API feature is provided as a technology
+preview. The API may undergo incompatible changes in upcoming
+releases.</b>
+
+<p>
+Traditionally adding new data into hive requires gathering a large
+amount of data onto HDFS and then periodically adding a new
+partition. This is essentially a <i>batch insertion</i>. Insertion of
+new data into an existing partition or table is not done in a way that
+gives consistent results to readers. Hive Streaming API allows data to
+be pumped continuously into Hive. The incoming data can be
+continuously committed in small batches (of records) into a Hive
+partition. Once data is committed it becomes immediately visible to
+all Hive queries initiated subsequently.</p>
+
+<p>
+This API is intended for streaming clients such as Flume and Storm,
+which continuously generate data. Streaming support is built on top of
+ACID based insert/update support in Hive.</p>
+
+<p>
+The classes and interfaces part of the Hive streaming API are broadly
+categorized into two. The first set provides support for connection
+and transaction management while the second set provides I/O
+support. Transactions are managed by the Hive MetaStore. Writes are
+performed to HDFS via Hive wrapper APIs that bypass MetaStore. </p>
+
+<p>
+<b>Note on packaging</b>: The APIs are defined in the
+<b>org.apache.hive.hcatalog.streaming</b> Java package and included as
+the hive-hcatalog-streaming jar.</p>
+
+<h2>STREAMING REQUIREMENTS</h2>
+
+<p>
+A few things are currently required to use streaming.
+</p>
+
+<p>
+<ol>
+ <li> Currently, only ORC storage format is supported. So
+ '<b>stored as orc</b>' must be specified during table creation.</li>
+ <li> The hive table must be bucketed, but not sorted. So something like
+ '<b>clustered by (<i>colName</i>) into <i>10</i> buckets</b>' must
+ be specified during table creation. The number of buckets
+ is ideally the same as the number of streaming writers.</li>
+ <li> User of the client streaming process must have the necessary
+ permissions to write to the table or partition and create partitions in
+ the table.</li>
+ <li> When issuing MapReduce queries on streaming tables, the user must set
+ <b>hive.input.format</b> to
+ <b>org.apache.hadoop.hive.ql.io.HiveInputFormat</b></li>
+ <li> Settings required in hive-site.xml:
+ <ol>
+ <li><b>hive.txn.manager =
+ org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</b></li>
+ <li><b>hive.compactor.initiator.on = true</b></li>
+ <li><b>hive.compactor.worker.threads</b> > 0</li>
+ </ol></li>
+</ol></p>
+
+<p>
+<b>Note:</b> Streaming to <b>unpartitioned</b> tables is also
+supported.</p>
+
+<h2>Transaction and Connection management</h2>
+
+<p>
+The class <a href="HiveEndPoint.html"><b>HiveEndPoint</b></a> is a Hive end
+point to connect to. An endpoint is either a Hive table or
+partition. An endpoint is cheap to create and does not internally hold
+on to any network connections. Invoking the newConnection method on
+it creates a new connection to the Hive MetaStore for streaming
+purposes. It returns a
+<a href="StreamingConnection.html"><b>StreamingConnection</b></a>
+object. Multiple connections can be established on the same
+endpoint. StreamingConnection can then be used to initiate new
+transactions for performing I/O. </p>
+
+<h3>Dynamic Partition Creation:</h3> It is very likely that a setup in
+which data is being streamed continuously (e.g. Flume), it is
+desirable to have new partitions created automatically (say on a
+hourly basis). In such cases requiring the Hive admin to pre-create
+the necessary partitions may not be reasonable. Consequently the
+streaming API allows streaming clients to create partitions as
+needed. <b>HiveEndPoind.newConnection()</b> accepts a argument to
+indicate if the partition should be auto created. Partition creation
+being an atomic action, multiple clients can race to create the
+partition, but only one would succeed, so streaming clients need not
+synchronize when creating a partition. The user of the client process
+needs to be given write permissions on the Hive table in order to
+create partitions.
+
+<h3>Batching Transactions:</h3> Transactions are implemented slightly
+differently than traditional database systems. Multiple transactions
+are grouped into a <i>Transaction Batch</i> and each transaction has
+an id. Data from each transaction batch gets a single file on HDFS,
+which eventually gets compacted with other files into a larger file
+automatically for efficiency.
+
+<h3>Basic Steps:</h3> After connection is established, a streaming
+client first requests for a new batch of transactions. In response it
+receives a set of transaction ids that are part of the transaction
+batch. Subsequently the client proceeds to consume one transaction at
+a time by initiating new transactions. Client will write() one or more
+records per transactions and either commit or abort the current
+transaction before switching to the next one. Each
+<b>TransactionBatch.write()</b> invocation automatically associates
+the I/O attempt with the current transaction id. The user of the
+streaming client needs to have write permissions to the partition or
+table.</p>
+
+<p>
+<b>Concurrency Note:</b> I/O can be performed on multiple
+<b>TransactionBatch</b>s concurrently. However the transactions within a
+transaction batch much be consumed sequentially.</p>
+
+<h2>Writing Data</h2>
+
+<p>
+These classes and interfaces provide support for writing the data to
+Hive within a transaction.
+<a href="RecordWriter.html"><b>RecordWriter</b></a> is the interface
+implemented by all writers. A writer is responsible for taking a
+record in the form of a <b>byte[]</b> containing data in a known
+format (e.g. CSV) and writing it out in the format supported by Hive
+streaming. A <b>RecordWriter</b> may reorder or drop fields from the incoming
+record if necessary to map them to the corresponding columns in the
+Hive Table. A streaming client will instantiate an appropriate
+<b>RecordWriter</b> type and pass it to
+<b>StreamingConnection.fetchTransactionBatch()</b>. The streaming client
+does not directly interact with the <b>RecordWriter</b> therafter, but
+relies on the <b>TransactionBatch</b> to do so.</p>
+
+<p>
+Currently, out of the box, the streaming API provides two
+implementations of the <b>RecordWriter</b> interface. One handles delimited
+input data (such as CSV, tab separated, etc. and the other for JSON
+(strict syntax). Support for other input formats can be provided by
+additional implementations of the <b>RecordWriter</b> interface.
+<ul>
+<li> <a href="DelimitedInputWriter.html"><b>DelimitedInputWriter</b></a>
+- Delimited text input.</li>
+<li> <a href="StrictJsonWriter.html"><b>StrictJsonWriter</b></a>
+- JSON text input.</li>
+</ul></p>
+
+</body>
+
+</html>
Added: hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.hcatalog.streaming;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.Parser;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.LogUtils;
+import org.apache.hadoop.util.StringUtils;
+
+import java.util.Arrays;
+import java.util.Random;
+
+/**
+ * A stand alone utility to write data into the streaming ingest interface.
+ */
+public class StreamingIntegrationTester {
+
+ static final private Log LOG = LogFactory.getLog(StreamingIntegrationTester.class.getName());
+
+ public static void main(String[] args) {
+
+ try {
+ LogUtils.initHiveLog4j();
+ } catch (LogUtils.LogInitializationException e) {
+ System.err.println("Unable to initialize log4j " + StringUtils.stringifyException(e));
+ System.exit(-1);
+ }
+
+ Options options = new Options();
+
+ options.addOption(OptionBuilder
+ .hasArg()
+ .withArgName("abort-pct")
+ .withDescription("Percentage of transactions to abort, defaults to 5")
+ .withLongOpt("abortpct")
+ .create('a'));
+
+ options.addOption(OptionBuilder
+ .hasArgs()
+ .withArgName("column-names")
+ .withDescription("column names of table to write to")
+ .withLongOpt("columns")
+ .withValueSeparator(',')
+ .isRequired()
+ .create('c'));
+
+ options.addOption(OptionBuilder
+ .hasArg()
+ .withArgName("database")
+ .withDescription("Database of table to write to")
+ .withLongOpt("database")
+ .isRequired()
+ .create('d'));
+
+ options.addOption(OptionBuilder
+ .hasArg()
+ .withArgName("frequency")
+ .withDescription("How often to commit a transaction, in seconds, defaults to 1")
+ .withLongOpt("frequency")
+ .create('f'));
+
+ options.addOption(OptionBuilder
+ .hasArg()
+ .withArgName("iterations")
+ .withDescription("Number of batches to write, defaults to 10")
+ .withLongOpt("num-batches")
+ .create('i'));
+
+ options.addOption(OptionBuilder
+ .hasArg()
+ .withArgName("metastore-uri")
+ .withDescription("URI of Hive metastore")
+ .withLongOpt("metastore-uri")
+ .isRequired()
+ .create('m'));
+
+ options.addOption(OptionBuilder
+ .hasArg()
+ .withArgName("num_transactions")
+ .withDescription("Number of transactions per batch, defaults to 100")
+ .withLongOpt("num-txns")
+ .create('n'));
+
+ options.addOption(OptionBuilder
+ .hasArgs()
+ .withArgName("partition-values")
+ .withDescription("partition values, must be provided in order of partition columns, " +
+ "if not provided table is assumed to not be partitioned")
+ .withLongOpt("partition")
+ .withValueSeparator(',')
+ .create('p'));
+
+ options.addOption(OptionBuilder
+ .hasArg()
+ .withArgName("records-per-transaction")
+ .withDescription("records to write in each transaction, defaults to 100")
+ .withLongOpt("records-per-txn")
+ .withValueSeparator(',')
+ .create('r'));
+
+ options.addOption(OptionBuilder
+ .hasArgs()
+ .withArgName("column-types")
+ .withDescription("column types, valid values are string, int, float, decimal, date, " +
+ "datetime")
+ .withLongOpt("schema")
+ .withValueSeparator(',')
+ .isRequired()
+ .create('s'));
+
+ options.addOption(OptionBuilder
+ .hasArg()
+ .withArgName("table")
+ .withDescription("Table to write to")
+ .withLongOpt("table")
+ .isRequired()
+ .create('t'));
+
+ options.addOption(OptionBuilder
+ .hasArg()
+ .withArgName("num-writers")
+ .withDescription("Number of writers to create, defaults to 2")
+ .withLongOpt("writers")
+ .create('w'));
+
+ options.addOption(OptionBuilder
+ .hasArg(false)
+ .withArgName("pause")
+ .withDescription("Wait on keyboard input after commit & batch close. default: disabled")
+ .withLongOpt("pause")
+ .create('x'));
+
+
+ Parser parser = new GnuParser();
+ CommandLine cmdline = null;
+ try {
+ cmdline = parser.parse(options, args);
+ } catch (ParseException e) {
+ System.err.println(e.getMessage());
+ usage(options);
+ }
+
+ boolean pause = cmdline.hasOption('x');
+ String db = cmdline.getOptionValue('d');
+ String table = cmdline.getOptionValue('t');
+ String uri = cmdline.getOptionValue('m');
+ int txnsPerBatch = Integer.valueOf(cmdline.getOptionValue('n', "100"));
+ int writers = Integer.valueOf(cmdline.getOptionValue('w', "2"));
+ int batches = Integer.valueOf(cmdline.getOptionValue('i', "10"));
+ int recordsPerTxn = Integer.valueOf(cmdline.getOptionValue('r', "100"));
+ int frequency = Integer.valueOf(cmdline.getOptionValue('f', "1"));
+ int ap = Integer.valueOf(cmdline.getOptionValue('a', "5"));
+ float abortPct = ((float)ap) / 100.0f;
+ String[] partVals = cmdline.getOptionValues('p');
+ String[] cols = cmdline.getOptionValues('c');
+ String[] types = cmdline.getOptionValues('s');
+
+ StreamingIntegrationTester sit = new StreamingIntegrationTester(db, table, uri,
+ txnsPerBatch, writers, batches, recordsPerTxn, frequency, abortPct, partVals, cols, types
+ , pause);
+ sit.go();
+ }
+
+ static void usage(Options options) {
+ HelpFormatter hf = new HelpFormatter();
+ hf.printHelp(HelpFormatter.DEFAULT_WIDTH, "sit [options]", "Usage: ", options, "");
+ System.exit(-1);
+ }
+
+ private String db;
+ private String table;
+ private String uri;
+ private int txnsPerBatch;
+ private int writers;
+ private int batches;
+ private int recordsPerTxn;
+ private int frequency;
+ private float abortPct;
+ private String[] partVals;
+ private String[] cols;
+ private String[] types;
+ private boolean pause;
+
+
+ private StreamingIntegrationTester(String db, String table, String uri, int txnsPerBatch,
+ int writers, int batches, int recordsPerTxn,
+ int frequency, float abortPct, String[] partVals,
+ String[] cols, String[] types, boolean pause) {
+ this.db = db;
+ this.table = table;
+ this.uri = uri;
+ this.txnsPerBatch = txnsPerBatch;
+ this.writers = writers;
+ this.batches = batches;
+ this.recordsPerTxn = recordsPerTxn;
+ this.frequency = frequency;
+ this.abortPct = abortPct;
+ this.partVals = partVals;
+ this.cols = cols;
+ this.types = types;
+ this.pause = pause;
+ }
+
+ private void go() {
+ HiveEndPoint endPoint = null;
+ try {
+ if (partVals == null) {
+ endPoint = new HiveEndPoint(uri, db, table, null);
+ } else {
+ endPoint = new HiveEndPoint(uri, db, table, Arrays.asList(partVals));
+ }
+
+ for (int i = 0; i < writers; i++) {
+ Writer w = new Writer(endPoint, i, txnsPerBatch, batches, recordsPerTxn, frequency, abortPct,
+ cols, types, pause);
+ w.start();
+ }
+
+ } catch (Throwable t) {
+ System.err.println("Caught exception while testing: " + StringUtils.stringifyException(t));
+ }
+ }
+
+ private static class Writer extends Thread {
+ private HiveEndPoint endPoint;
+ private int txnsPerBatch;
+ private int batches;
+ private int writerNumber;
+ private int recordsPerTxn;
+ private int frequency;
+ private float abortPct;
+ private String[] cols;
+ private String[] types;
+ private boolean pause;
+ private Random rand;
+
+ Writer(HiveEndPoint endPoint, int writerNumber, int txnsPerBatch, int batches,
+ int recordsPerTxn, int frequency, float abortPct, String[] cols, String[] types
+ , boolean pause) {
+ this.endPoint = endPoint;
+ this.txnsPerBatch = txnsPerBatch;
+ this.batches = batches;
+ this.writerNumber = writerNumber;
+ this.recordsPerTxn = recordsPerTxn;
+ this.frequency = frequency;
+ this.abortPct = abortPct;
+ this.cols = cols;
+ this.types = types;
+ this.pause = pause;
+ rand = new Random();
+ }
+
+ @Override
+ public void run() {
+ StreamingConnection conn = null;
+ try {
+ conn = endPoint.newConnection(true);
+ RecordWriter writer = new DelimitedInputWriter(cols, ",", endPoint);
+
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < batches; i++) {
+ LOG.info("Starting batch " + i);
+ TransactionBatch batch = conn.fetchTransactionBatch(txnsPerBatch, writer);
+ try {
+ while (batch.remainingTransactions() > 0) {
+ batch.beginNextTransaction();
+ for (int j = 0; j < recordsPerTxn; j++) {
+ batch.write(generateRecord(cols, types));
+ }
+ if (rand.nextFloat() < abortPct) batch.abort();
+ else
+ batch.commit();
+ if (pause) {
+ System.out.println("Writer " + writerNumber +
+ " committed... press Enter to continue. " + Thread.currentThread().getId());
+ System.in.read();
+ }
+ }
+ long end = System.currentTimeMillis();
+ if (end - start < frequency) Thread.sleep(frequency - (end - start));
+ } finally {
+ batch.close();
+ if (pause) {
+ System.out.println("Writer " + writerNumber +
+ " has closed a Batch.. press Enter to continue. " + Thread.currentThread().getId());
+ System.in.read();
+ }
+ }
+ }
+ } catch (Throwable t) {
+ System.err.println("Writer number " + writerNumber
+ + " caught exception while testing: " + StringUtils.stringifyException(t));
+ } finally {
+ if (conn!=null) conn.close();
+ }
+ }
+
+ private byte[] generateRecord(String[] cols, String[] types) {
+ // TODO make it so I can randomize the column order
+
+ StringBuilder buf = new StringBuilder();
+ for (int i = 0; i < types.length; i++) {
+ buf.append(generateColumn(types[i]));
+ buf.append(",");
+ }
+ return buf.toString().getBytes();
+ }
+
+ private String generateColumn(String type) {
+ if ("string".equals(type.toLowerCase())) {
+ return "When that Aprilis with his showers swoot";
+ } else if (type.toLowerCase().startsWith("int")) {
+ return "42";
+ } else if (type.toLowerCase().startsWith("dec") || type.toLowerCase().equals("float")) {
+ return "3.141592654";
+ } else if (type.toLowerCase().equals("datetime")) {
+ return "2014-03-07 15:33:22";
+ } else if (type.toLowerCase().equals("date")) {
+ return "1955-11-12";
+ } else {
+ throw new RuntimeException("Sorry, I don't know the type " + type);
+ }
+ }
+ }
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestDelimitedInputWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestDelimitedInputWriter.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestDelimitedInputWriter.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestDelimitedInputWriter.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+import com.google.common.collect.Lists;
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+public class TestDelimitedInputWriter {
+ @Test
+ public void testFieldReordering() throws Exception {
+
+ ArrayList<String> colNames = Lists.newArrayList(new String[]{"col1", "col2", "col3", "col4", "col5"});
+ {//1) test dropping fields - first middle & last
+ String[] fieldNames = {null, "col2", null, "col4", null};
+ int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+ Assert.assertTrue(Arrays.equals(mapping, new int[]{-1, 1, -1, 3, -1}));
+ }
+
+ {//2) test reordering
+ String[] fieldNames = {"col5", "col4", "col3", "col2", "col1"};
+ int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+ Assert.assertTrue( Arrays.equals(mapping, new int[]{4,3,2,1,0}) );
+ }
+
+ {//3) test bad field names
+ String[] fieldNames = {"xyz", "abc", "col3", "col4", "as"};
+ try {
+ DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+ Assert.fail();
+ } catch (InvalidColumn e) {
+ // should throw
+ }
+ }
+
+ {//4) test few field names
+ String[] fieldNames = {"col3", "col4"};
+ int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+ Assert.assertTrue( Arrays.equals(mapping, new int[]{2,3}) );
+ }
+
+ {//5) test extra field names
+ String[] fieldNames = {"col5", "col4", "col3", "col2", "col1", "col1"};
+ try {
+ DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+ Assert.fail();
+ } catch (InvalidColumn e) {
+ //show throw
+ }
+ }
+ }
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java Thu Apr 10 01:08:59 2014
@@ -0,0 +1,800 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+import junit.framework.Assert;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.thrift.TException;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class TestStreaming {
+
+ public static class RawFileSystem extends RawLocalFileSystem {
+ private static final URI NAME;
+ static {
+ try {
+ NAME = new URI("raw:///");
+ } catch (URISyntaxException se) {
+ throw new IllegalArgumentException("bad uri", se);
+ }
+ }
+
+ @Override
+ public URI getUri() {
+ return NAME;
+ }
+
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ File file = pathToFile(path);
+ if (!file.exists()) {
+ throw new FileNotFoundException("Can't find " + path);
+ }
+ // get close enough
+ short mod = 0;
+ if (file.canRead()) {
+ mod |= 0444;
+ }
+ if (file.canWrite()) {
+ mod |= 0200;
+ }
+ if (file.canExecute()) {
+ mod |= 0111;
+ }
+ return new FileStatus(file.length(), file.isDirectory(), 1, 1024,
+ file.lastModified(), file.lastModified(),
+ FsPermission.createImmutable(mod), "owen", "users", path);
+ }
+ }
+
+ private static final String COL1 = "id";
+ private static final String COL2 = "msg";
+
+ private final HiveConf conf;
+ private final IMetaStoreClient msClient;
+
+ final String metaStoreURI = null;
+
+ // partitioned table
+ private final static String dbName = "testing";
+ private final static String tblName = "alerts";
+ private final static String[] fieldNames = new String[]{COL1,COL2};
+ List<String> partitionVals;
+ private static String partLocation;
+
+ // unpartitioned table
+ private final static String dbName2 = "testing";
+ private final static String tblName2 = "alerts";
+ private final static String[] fieldNames2 = new String[]{COL1,COL2};
+
+ private final String PART1_CONTINENT = "Asia";
+ private final String PART1_COUNTRY = "India";
+
+ @Rule
+ public TemporaryFolder dbFolder = new TemporaryFolder();
+
+
+ public TestStreaming() throws Exception {
+ partitionVals = new ArrayList<String>(2);
+ partitionVals.add(PART1_CONTINENT);
+ partitionVals.add(PART1_COUNTRY);
+
+ conf = new HiveConf(this.getClass());
+ conf.set("fs.raw.impl", RawFileSystem.class.getName());
+ TxnDbUtil.setConfValues(conf);
+ if (metaStoreURI!=null) {
+ conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
+ }
+ conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+
+ //1) Start from a clean slate (metastore)
+ TxnDbUtil.cleanDb();
+ TxnDbUtil.prepDb();
+
+ //2) obtain metastore clients
+ msClient = new HiveMetaStoreClient(conf);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ // drop and recreate the necessary databases and tables
+ dropDB(msClient, dbName);
+ createDbAndTable(msClient, dbName, tblName, partitionVals);
+
+ dropDB(msClient, dbName2);
+ createDbAndTable(msClient, dbName2, tblName2, partitionVals);
+ }
+
+ private static List<FieldSchema> getPartitionKeys() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ // Defining partition names in unsorted order
+ fields.add(new FieldSchema("continent", serdeConstants.STRING_TYPE_NAME, ""));
+ fields.add(new FieldSchema("country", serdeConstants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+ private void checkDataWritten(long minTxn, long maxTxn, int buckets, int numExpectedFiles,
+ String... records) throws Exception {
+ ValidTxnList txns = msClient.getValidTxns();
+ AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(partLocation), conf, txns);
+ Assert.assertEquals(0, dir.getObsolete().size());
+ Assert.assertEquals(0, dir.getOriginalFiles().size());
+ List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
+ System.out.println("Files found: ");
+ for (AcidUtils.ParsedDelta pd : current) System.out.println(pd.getPath().toString());
+ Assert.assertEquals(numExpectedFiles, current.size());
+
+ // find the absolute mininum transaction
+ long min = Long.MAX_VALUE;
+ long max = Long.MIN_VALUE;
+ for (AcidUtils.ParsedDelta pd : current) {
+ if (pd.getMaxTransaction() > max) max = pd.getMaxTransaction();
+ if (pd.getMinTransaction() < min) min = pd.getMinTransaction();
+ }
+ Assert.assertEquals(minTxn, min);
+ Assert.assertEquals(maxTxn, max);
+
+ InputFormat inf = new OrcInputFormat();
+ JobConf job = new JobConf();
+ job.set("mapred.input.dir", partLocation.toString());
+ job.set("bucket_count", Integer.toString(buckets));
+ job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
+ InputSplit[] splits = inf.getSplits(job, 1);
+ Assert.assertEquals(1, splits.length);
+ org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
+ inf.getRecordReader(splits[0], job, Reporter.NULL);
+
+ NullWritable key = rr.createKey();
+ OrcStruct value = rr.createValue();
+ for (int i = 0; i < records.length; i++) {
+ Assert.assertEquals(true, rr.next(key, value));
+ Assert.assertEquals(records[i], value.toString());
+ }
+ Assert.assertEquals(false, rr.next(key, value));
+ }
+
+ private void checkNothingWritten() throws Exception {
+ ValidTxnList txns = msClient.getValidTxns();
+ AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(partLocation), conf, txns);
+ Assert.assertEquals(0, dir.getObsolete().size());
+ Assert.assertEquals(0, dir.getOriginalFiles().size());
+ List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
+ Assert.assertEquals(0, current.size());
+ }
+
+ @Test
+ public void testEndpointConnection() throws Exception {
+ // 1) Basic
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
+ , partitionVals);
+ StreamingConnection connection = endPt.newConnection(false, null); //shouldn't throw
+ connection.close();
+
+ // 2) Leave partition unspecified
+ endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null);
+ endPt.newConnection(false, null).close(); // should not throw
+ }
+
+ @Test
+ public void testAddPartition() throws Exception {
+ List<String> newPartVals = new ArrayList<String>(2);
+ newPartVals.add(PART1_CONTINENT);
+ newPartVals.add("Nepal");
+
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
+ , newPartVals);
+
+ // Ensure partition is absent
+ try {
+ msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals);
+ Assert.assertTrue("Partition already exists", false);
+ } catch (NoSuchObjectException e) {
+ // expect this exception
+ }
+
+ // Create partition
+ Assert.assertNotNull(endPt.newConnection(true, null));
+
+ // Ensure partition is present
+ Partition p = msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals);
+ Assert.assertNotNull("Did not find added partition", p);
+ }
+
+ @Test
+ public void testTransactionBatchEmptyCommit() throws Exception {
+ // 1) to partitioned table
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(false, null);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+
+ txnBatch.beginNextTransaction();
+ txnBatch.commit();
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.close();
+ connection.close();
+
+ // 2) To unpartitioned table
+ endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+ writer = new DelimitedInputWriter(fieldNames2,",", endPt);
+ connection = endPt.newConnection(false, null);
+
+ txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.commit();
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.close();
+ connection.close();
+ }
+
+ @Test
+ public void testTransactionBatchEmptyAbort() throws Exception {
+ // 1) to partitioned table
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(true);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.abort();
+ Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.close();
+ connection.close();
+
+ // 2) to unpartitioned table
+ endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+ writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ connection = endPt.newConnection(true);
+
+ txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.abort();
+ Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.close();
+ connection.close();
+ }
+
+ @Test
+ public void testTransactionBatchCommit_Delimited() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(true);
+
+ // 1st Txn
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.commit();
+
+ checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+
+ // 2nd Txn
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write("2,Welcome to streaming".getBytes());
+
+ // data should not be visible
+ checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+
+ txnBatch.commit();
+
+ checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}",
+ "{2, Welcome to streaming}");
+
+ txnBatch.close();
+ Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+ , txnBatch.getCurrentTransactionState());
+
+
+ connection.close();
+
+
+ // To Unpartitioned table
+ endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+ writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ connection = endPt.newConnection(true);
+
+ // 1st Txn
+ txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.commit();
+
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+ connection.close();
+ }
+
+ @Test
+ public void testTransactionBatchCommit_Json() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ StrictJsonWriter writer = new StrictJsonWriter(endPt);
+ StreamingConnection connection = endPt.newConnection(true);
+
+ // 1st Txn
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ String rec1 = "{\"id\" : 1, \"msg\": \"Hello streaming\"}";
+ txnBatch.write(rec1.getBytes());
+ txnBatch.commit();
+
+ checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+
+ txnBatch.close();
+ Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+ , txnBatch.getCurrentTransactionState());
+
+ connection.close();
+ }
+
+ @Test
+ public void testRemainingTransactions() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(true);
+
+ // 1) test with txn.Commit()
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ int batch=0;
+ int initialCount = txnBatch.remainingTransactions();
+ while (txnBatch.remainingTransactions()>0) {
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(--initialCount, txnBatch.remainingTransactions());
+ for (int rec=0; rec<2; ++rec) {
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write((batch * rec + ",Hello streaming").getBytes());
+ }
+ txnBatch.commit();
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+ ++batch;
+ }
+ Assert.assertEquals(0, txnBatch.remainingTransactions());
+ txnBatch.close();
+
+ Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+ , txnBatch.getCurrentTransactionState());
+
+ // 2) test with txn.Abort()
+ txnBatch = connection.fetchTransactionBatch(10, writer);
+ batch=0;
+ initialCount = txnBatch.remainingTransactions();
+ while (txnBatch.remainingTransactions()>0) {
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(--initialCount,txnBatch.remainingTransactions());
+ for (int rec=0; rec<2; ++rec) {
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write((batch * rec + ",Hello streaming").getBytes());
+ }
+ txnBatch.abort();
+ Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+ , txnBatch.getCurrentTransactionState());
+ ++batch;
+ }
+ Assert.assertEquals(0,txnBatch.remainingTransactions());
+ txnBatch.close();
+
+ Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+ , txnBatch.getCurrentTransactionState());
+
+ connection.close();
+ }
+
+ @Test
+ public void testTransactionBatchAbort() throws Exception {
+
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(false);
+
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.write("2,Welcome to streaming".getBytes());
+ txnBatch.abort();
+
+ checkNothingWritten();
+
+ Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+ , txnBatch.getCurrentTransactionState());
+
+ txnBatch.close();
+ connection.close();
+
+ checkNothingWritten();
+
+ }
+
+
+ @Test
+ public void testTransactionBatchAbortAndCommit() throws Exception {
+
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(false);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.write("2,Welcome to streaming".getBytes());
+ txnBatch.abort();
+
+ checkNothingWritten();
+
+ Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+ , txnBatch.getCurrentTransactionState());
+
+ txnBatch.beginNextTransaction();
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.write("2,Welcome to streaming".getBytes());
+ txnBatch.commit();
+
+ checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}",
+ "{2, Welcome to streaming}");
+
+ txnBatch.close();
+ connection.close();
+ }
+
+ @Test
+ public void testMultipleTransactionBatchCommits() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(true);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.commit();
+
+ checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+
+ txnBatch.beginNextTransaction();
+ txnBatch.write("2,Welcome to streaming".getBytes());
+ txnBatch.commit();
+
+ checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}",
+ "{2, Welcome to streaming}");
+
+ txnBatch.close();
+
+ // 2nd Txn Batch
+ txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("3,Hello streaming - once again".getBytes());
+ txnBatch.commit();
+
+ checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
+ "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
+
+ txnBatch.beginNextTransaction();
+ txnBatch.write("4,Welcome to streaming - once again".getBytes());
+ txnBatch.commit();
+
+ checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
+ "{2, Welcome to streaming}", "{3, Hello streaming - once again}",
+ "{4, Welcome to streaming - once again}");
+
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+
+ txnBatch.close();
+
+ connection.close();
+ }
+
+
+ @Test
+ public void testInterleavedTransactionBatchCommits() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt);
+ StreamingConnection connection = endPt.newConnection(false);
+
+ // Acquire 1st Txn Batch
+ TransactionBatch txnBatch1 = connection.fetchTransactionBatch(10, writer);
+ txnBatch1.beginNextTransaction();
+
+ // Acquire 2nd Txn Batch
+ DelimitedInputWriter writer2 = new DelimitedInputWriter(fieldNames, ",", endPt);
+ TransactionBatch txnBatch2 = connection.fetchTransactionBatch(10, writer2);
+ txnBatch2.beginNextTransaction();
+
+ // Interleaved writes to both batches
+ txnBatch1.write("1,Hello streaming".getBytes());
+ txnBatch2.write("3,Hello streaming - once again".getBytes());
+
+ checkNothingWritten();
+
+ txnBatch2.commit();
+
+ checkDataWritten(11, 20, 1, 1, "{3, Hello streaming - once again}");
+
+ txnBatch1.commit();
+
+ checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+
+ txnBatch1.beginNextTransaction();
+ txnBatch1.write("2,Welcome to streaming".getBytes());
+
+ txnBatch2.beginNextTransaction();
+ txnBatch2.write("4,Welcome to streaming - once again".getBytes());
+
+ checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+
+ txnBatch1.commit();
+
+ checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
+ "{2, Welcome to streaming}",
+ "{3, Hello streaming - once again}");
+
+ txnBatch2.commit();
+
+ checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
+ "{2, Welcome to streaming}",
+ "{3, Hello streaming - once again}",
+ "{4, Welcome to streaming - once again}");
+
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch1.getCurrentTransactionState());
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch2.getCurrentTransactionState());
+
+ txnBatch1.close();
+ txnBatch2.close();
+
+ connection.close();
+ }
+
+ class WriterThd extends Thread {
+
+ private StreamingConnection conn;
+ private HiveEndPoint ep;
+ private DelimitedInputWriter writer;
+ private String data;
+
+ WriterThd(HiveEndPoint ep, String data) throws Exception {
+ this.ep = ep;
+ writer = new DelimitedInputWriter(fieldNames, ",", ep);
+ conn = ep.newConnection(false);
+ this.data = data;
+ }
+
+ @Override
+ public void run() {
+ TransactionBatch txnBatch = null;
+ try {
+ txnBatch = conn.fetchTransactionBatch(1000, writer);
+ while (txnBatch.remainingTransactions() > 0) {
+ txnBatch.beginNextTransaction();
+ txnBatch.write(data.getBytes());
+ txnBatch.write(data.getBytes());
+ txnBatch.commit();
+ } // while
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (txnBatch != null) {
+ try {
+ txnBatch.close();
+ } catch (Exception e) {
+ conn.close();
+ throw new RuntimeException(e);
+ }
+ }
+ try {
+ conn.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+ }
+ }
+
+ @Test
+ public void testConcurrentTransactionBatchCommits() throws Exception {
+ final HiveEndPoint ep = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals);
+ WriterThd t1 = new WriterThd(ep, "1,Matrix");
+ WriterThd t2 = new WriterThd(ep, "2,Gandhi");
+ WriterThd t3 = new WriterThd(ep, "3,Silence");
+
+ t1.start();
+ t2.start();
+ t3.start();
+
+ t1.join();
+ t2.join();
+ t3.join();
+
+ }
+
+ // delete db and all tables in it
+ public static void dropDB(IMetaStoreClient client, String databaseName) {
+ try {
+ for (String table : client.listTableNamesByFilter(databaseName, "", (short)-1)) {
+ client.dropTable(databaseName, table, true, true);
+ }
+ client.dropDatabase(databaseName);
+ } catch (TException e) {
+ }
+
+ }
+
+ public void createDbAndTable(IMetaStoreClient client, String databaseName,
+ String tableName, List<String> partVals)
+ throws Exception {
+ Database db = new Database();
+ db.setName(databaseName);
+ String dbLocation = "raw://" + dbFolder.newFolder(databaseName + ".db").getCanonicalPath();
+ db.setLocationUri(dbLocation);
+ client.createDatabase(db);
+
+ Table tbl = new Table();
+ tbl.setDbName(databaseName);
+ tbl.setTableName(tableName);
+ tbl.setTableType(TableType.MANAGED_TABLE.toString());
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setCols(getTableColumns());
+ sd.setNumBuckets(1);
+ sd.setLocation(dbLocation + Path.SEPARATOR + tableName);
+ tbl.setPartitionKeys(getPartitionKeys());
+
+ tbl.setSd(sd);
+
+ sd.setBucketCols(new ArrayList<String>(2));
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.getSerdeInfo().setName(tbl.getTableName());
+ sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+ sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+
+ sd.getSerdeInfo().setSerializationLib(OrcSerde.class.getName());
+ sd.setInputFormat(HiveInputFormat.class.getName());
+ sd.setOutputFormat(OrcOutputFormat.class.getName());
+
+ Map<String, String> tableParams = new HashMap<String, String>();
+ tbl.setParameters(tableParams);
+ client.createTable(tbl);
+
+ try {
+ addPartition(client, tbl, partVals);
+ } catch (AlreadyExistsException e) {
+ }
+ Partition createdPartition = client.getPartition(databaseName, tableName, partVals);
+ partLocation = createdPartition.getSd().getLocation();
+ }
+
+ private static void addPartition(IMetaStoreClient client, Table tbl
+ , List<String> partValues)
+ throws IOException, TException {
+ Partition part = new Partition();
+ part.setDbName(tbl.getDbName());
+ part.setTableName(tblName);
+ StorageDescriptor sd = new StorageDescriptor(tbl.getSd());
+ sd.setLocation(sd.getLocation() + Path.SEPARATOR + makePartPath(tbl.getPartitionKeys()
+ , partValues));
+ part.setSd(sd);
+ part.setValues(partValues);
+ client.add_partition(part);
+ }
+
+ private static String makePartPath(List<FieldSchema> partKeys, List<String> partVals) {
+ if (partKeys.size()!=partVals.size()) {
+ throw new IllegalArgumentException("Partition values:" + partVals
+ + ", does not match the partition Keys in table :" + partKeys );
+ }
+ StringBuffer buff = new StringBuffer(partKeys.size()*20);
+ buff.append(" ( ");
+ int i=0;
+ for (FieldSchema schema : partKeys) {
+ buff.append(schema.getName());
+ buff.append("='");
+ buff.append(partVals.get(i));
+ buff.append("'");
+ if (i!=partKeys.size()-1) {
+ buff.append(Path.SEPARATOR);
+ }
+ ++i;
+ }
+ buff.append(" )");
+ return buff.toString();
+ }
+
+
+ private static List<FieldSchema> getTableColumns() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema(COL1, serdeConstants.INT_TYPE_NAME, ""));
+ fields.add(new FieldSchema(COL2, serdeConstants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+}
Added: hive/branches/branch-0.13/hcatalog/streaming/src/test/sit
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/test/sit?rev=1586190&view=auto
==============================================================================
--- hive/branches/branch-0.13/hcatalog/streaming/src/test/sit (added)
+++ hive/branches/branch-0.13/hcatalog/streaming/src/test/sit Thu Apr 10 01:08:59 2014
@@ -0,0 +1,39 @@
+#!/bin/sh
+
+if [ "${HADOOP_HOME}x" == "x" ]
+ then
+ echo "Please set HADOOP_HOME";
+ exit 1
+fi
+
+if [ "${HIVE_HOME}x" == "x" ]
+ then
+ echo "Please set HIVE_HOME";
+ exit 1
+fi
+
+if [ "${JAVA_HOME}x" == "x" ]
+ then
+ echo "Please set JAVA_HOME";
+ exit 1
+fi
+
+for jar in ${HADOOP_HOME}/client/*.jar
+ do
+ CLASSPATH=${CLASSPATH}:$jar
+done
+
+for jar in ${HIVE_HOME}/lib/*.jar
+ do
+ CLASSPATH=${CLASSPATH}:$jar
+done
+
+for jar in ${HIVE_HOME}/hcatalog/share/hcatalog/*.jar
+ do
+ CLASSPATH=${CLASSPATH}:$jar
+done
+
+CLASSPATH=${CLASSPATH}:${HADOOP_HOME}/conf
+CLASSPATH=${CLASSPATH}:${HIVE_HOME}/conf
+
+$JAVA_HOME/bin/java -cp ${CLASSPATH} org.apache.hive.hcatalog.streaming.StreamingIntegrationTester $@
Modified: hive/branches/branch-0.13/packaging/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/packaging/pom.xml?rev=1586190&r1=1586189&r2=1586190&view=diff
==============================================================================
--- hive/branches/branch-0.13/packaging/pom.xml (original)
+++ hive/branches/branch-0.13/packaging/pom.xml Thu Apr 10 01:08:59 2014
@@ -133,6 +133,11 @@
</dependency>
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-hcatalog-streaming</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-hbase-storage-handler</artifactId>
<version>${project.version}</version>
</dependency>
Modified: hive/branches/branch-0.13/packaging/src/main/assembly/bin.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/packaging/src/main/assembly/bin.xml?rev=1586190&r1=1586189&r2=1586190&view=diff
==============================================================================
--- hive/branches/branch-0.13/packaging/src/main/assembly/bin.xml (original)
+++ hive/branches/branch-0.13/packaging/src/main/assembly/bin.xml Thu Apr 10 01:08:59 2014
@@ -52,6 +52,7 @@
<include>org.apache.hive.hcatalog:hive-hcatalog-core</include>
<include>org.apache.hive.hcatalog:hive-hcatalog-pig-adapter</include>
<include>org.apache.hive.hcatalog:hive-hcatalog-server-extensions</include>
+ <include>org.apache.hive.hcatalog:hive-hcatalog-streaming</include>
</includes>
</dependencySet>
<dependencySet>