You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/04/16 06:35:53 UTC

[2/4] hive git commit: HIVE-19210: Create separate module for streaming ingest (Prasanth Jayachandran reviewed by Eugene Koifman)

http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/package.html
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/package.html b/streaming/src/java/org/apache/hive/streaming/package.html
new file mode 100644
index 0000000..2b45792
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/package.html
@@ -0,0 +1,181 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
+        "http://www.w3.org/TR/html4/loose.dtd">
+
+<html lang="en">
+
+<head>
+<meta name=Title content="HCatalog Streaming API">
+<meta name=Keywords content="HCatalog Streaming ACID">
+<meta http-equiv=Content-Type content="text/html; charset=macintosh">
+<title>HCatalog Streaming API</title>
+</head>
+
+<body>
+
+<h1>HCatalog Streaming API -- high level description</h1>
+
+<b>NOTE: The Streaming API feature is provided as a technology
+preview. The API may undergo incompatible changes in upcoming
+releases.</b>
+
+<p>
+Traditionally adding new data into hive requires gathering a large
+amount of data onto HDFS and then periodically adding a new
+partition. This is essentially a <i>batch insertion</i>. Insertion of
+new data into an existing partition or table is not done in a way that
+gives consistent results to readers. Hive Streaming API allows data to
+be pumped continuously into Hive. The incoming data can be
+continuously committed in small batches (of records) into a Hive
+partition. Once data is committed it becomes immediately visible to
+all Hive queries initiated subsequently.</p>
+
+<p>
+This API is intended for streaming clients such as NiFi, Flume and Storm,
+which continuously generate data. Streaming support is built on top of
+ACID based insert/update support in Hive.</p>
+
+<p>
+The classes and interfaces part of the Hive streaming API are broadly
+categorized into two. The first set provides support for connection
+and transaction management while the second set provides I/O
+support. Transactions are managed by the Hive MetaStore.  Writes are
+performed to HDFS via Hive wrapper APIs that bypass MetaStore. </p>
+
+<p>
+<b>Note on packaging</b>: The APIs are defined in the 
+<b>org.apache.hive.streaming</b> Java package and included as
+the hive-streaming jar.</p>
+
+<h2>STREAMING REQUIREMENTS</h2>
+
+<p>
+A few things are currently required to use streaming.
+</p>
+
+<p>
+<ol>
+  <li> Currently, only ORC storage format is supported. So 
+    '<b>stored as orc</b>' must be specified during table creation.</li>
+  <li> The hive table may be bucketed but must not be sorted. </li>
+  <li> User of the client streaming process must have the necessary 
+    permissions to write to the table or partition and create partitions in
+    the table.</li>
+  <li> Currently, when issuing queries on streaming tables, query client must set
+    <ol>
+      <li><b>hive.input.format =
+             org.apache.hadoop.hive.ql.io.HiveInputFormat</b></li>
+    </ol></li>
+  The above client settings are a temporary requirement and the intention is to
+  drop the need for them in the near future.
+  <li> Settings required in hive-site.xml for Metastore:
+    <ol>
+      <li><b>hive.txn.manager =
+	org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</b></li>
+      <li><b>hive.support.concurrency = true </b> </li>
+      <li><b>hive.compactor.initiator.on = true</b> </li>
+      <li><b>hive.compactor.worker.threads > 0 </b> </li>
+    </ol></li>
+</ol></p>
+
+<p>
+<b>Note:</b> Streaming to <b>unpartitioned</b> tables is also
+supported.</p>
+
+<h2>Transaction and Connection management</h2>
+
+<p>
+The class <a href="HiveEndPoint.html"><b>HiveEndPoint</b></a> is a Hive end
+point to connect to. An endpoint is either a Hive table or
+partition. An endpoint is cheap to create and does not internally hold
+on to any network connections.  Invoking the newConnection method on
+it creates a new connection to the Hive MetaStore for streaming
+purposes.  It returns a
+<a href="StreamingConnection.html"><b>StreamingConnection</b></a>
+object. Multiple connections can be established on the same
+endpoint. StreamingConnection can then be used to initiate new
+transactions for performing I/O. </p>
+
+<h3>Dynamic Partition Creation:</h3> It is very likely that a setup in
+which data is being streamed continuously (e.g. Flume), it is
+desirable to have new partitions created automatically (say on a
+hourly basis). In such cases requiring the Hive admin to pre-create
+the necessary partitions may not be reasonable.  Consequently the
+streaming API allows streaming clients to create partitions as
+needed. <b>HiveEndPoind.newConnection()</b> accepts a argument to
+indicate if the partition should be auto created. Partition creation
+being an atomic action, multiple clients can race to create the
+partition, but only one would succeed, so streaming clients need not
+synchronize when creating a partition. The user of the client process
+needs to be given write permissions on the Hive table in order to
+create partitions.
+
+<h3>Batching Transactions:</h3> Transactions are implemented slightly
+differently than traditional database systems. Multiple transactions
+are grouped into a <i>Transaction Batch</i> and each transaction has
+an id. Data from each transaction batch gets a single file on HDFS,
+which eventually gets compacted with other files into a larger file
+automatically for efficiency.
+
+<h3>Basic Steps:</h3> After connection is established, a streaming
+client first requests for a new batch of transactions. In response it
+receives a set of transaction ids that are part of the transaction
+batch. Subsequently the client proceeds to consume one transaction at
+a time by initiating new transactions. Client will write() one or more
+records per transactions and either commit or abort the current
+transaction before switching to the next one. Each
+<b>TransactionBatch.write()</b> invocation automatically associates
+the I/O attempt with the current transaction id. The user of the
+streaming client needs to have write permissions to the partition or
+table.</p>
+
+<p>
+<b>Concurrency Note:</b> I/O can be performed on multiple
+<b>TransactionBatch</b>s concurrently. However the transactions within a
+transaction batch much be consumed sequentially.</p>
+
+<h2>Writing Data</h2>
+
+<p>
+These classes and interfaces provide support for writing the data to
+Hive within a transaction. 
+<a href="RecordWriter.html"><b>RecordWriter</b></a> is the interface
+implemented by all writers. A writer is responsible for taking a
+record in the form of a <b>byte[]</b> containing data in a known
+format (e.g. CSV) and writing it out in the format supported by Hive
+streaming. A <b>RecordWriter</b> may reorder or drop fields from the incoming
+record if necessary to map them to the corresponding columns in the
+Hive Table. A streaming client will instantiate an appropriate
+<b>RecordWriter</b> type and pass it to 
+<b>StreamingConnection.fetchTransactionBatch()</b>.  The streaming client 
+does not directly interact with the <b>RecordWriter</b> therafter, but 
+relies on the <b>TransactionBatch</b> to do so.</p>
+
+<p>
+Currently, out of the box, the streaming API provides two
+implementations of the <b>RecordWriter</b> interface. One handles delimited
+input data (such as CSV, tab separated, etc.  and the other for JSON
+(strict syntax). Support for other input formats can be provided by
+additional implementations of the <b>RecordWriter</b> interface.
+<ul>
+<li> <a href="DelimitedInputWriter.html"><b>DelimitedInputWriter</b></a> 
+- Delimited text input.</li>
+<li> <a href="StrictJsonWriter.html"><b>StrictJsonWriter</b></a> 
+- JSON text input.</li>
+  <li> <a href="StrictRegexWriter.html"><b>StrictRegexWriter</b></a>
+    - text input with regex.</li>
+</ul></p>
+
+<h2>Performance, Concurrency, Etc.</h2>
+<p>
+  Each StreamingConnection is writing data at the rate the underlying
+  FileSystem can accept it.  If that is not sufficient, multiple StreamingConnection objects can
+  be created concurrently.
+</p>
+<p>
+  Each StreamingConnection can have at most 1 outstanding TransactionBatch and each TransactionBatch
+  may have at most 2 threads operaing on it.
+  See <a href="TransactionBatch.html"><b>TransactionBatch</b></a>
+</p>
+</body>
+
+</html>

http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java b/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java
new file mode 100644
index 0000000..f0843a1
--- /dev/null
+++ b/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import junit.framework.Assert;
+
+public class TestDelimitedInputWriter {
+  @Test
+  public void testFieldReordering() throws Exception {
+
+    ArrayList<String> colNames = Lists.newArrayList(new String[]{"col1", "col2", "col3", "col4", "col5"});
+    {//1)  test dropping fields - first middle  & last
+      String[] fieldNames = {null, "col2", null, "col4", null};
+      int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+      Assert.assertTrue(Arrays.equals(mapping, new int[]{-1, 1, -1, 3, -1}));
+    }
+
+    {//2)  test reordering
+      String[] fieldNames = {"col5", "col4", "col3", "col2", "col1"};
+      int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+      Assert.assertTrue( Arrays.equals(mapping, new int[]{4,3,2,1,0}) );
+    }
+
+    {//3)  test bad field names
+      String[] fieldNames = {"xyz", "abc", "col3", "col4", "as"};
+      try {
+        DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+        Assert.fail();
+      } catch (InvalidColumn e) {
+        // should throw
+      }
+    }
+
+    {//4)  test few field names
+      String[] fieldNames = {"col3", "col4"};
+      int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+      Assert.assertTrue( Arrays.equals(mapping, new int[]{2,3}) );
+    }
+
+    {//5)  test extra field names
+      String[] fieldNames = {"col5", "col4", "col3", "col2", "col1", "col1"};
+      try {
+      DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+      Assert.fail();
+      } catch (InvalidColumn e) {
+        //show throw
+      }
+    }
+  }
+}