You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/04/16 06:35:55 UTC
[4/4] hive git commit: HIVE-19210: Create separate module for
streaming ingest (Prasanth Jayachandran reviewed by Eugene Koifman)
HIVE-19210: Create separate module for streaming ingest (Prasanth Jayachandran reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6bd32a0d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6bd32a0d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6bd32a0d
Branch: refs/heads/master
Commit: 6bd32a0d6a5dbcd0553ff88fd6c7b9a653e6e1eb
Parents: 16d94fb
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Sun Apr 15 23:34:12 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Sun Apr 15 23:34:12 2018 -0700
----------------------------------------------------------------------
.../hive/hcatalog/streaming/package-info.java | 19 +
itests/hive-unit/pom.xml | 4 +-
.../hive/ql/txn/compactor/TestCompactor.java | 10 +-
packaging/pom.xml | 5 +
packaging/src/main/assembly/src.xml | 1 +
pom.xml | 1 +
streaming/pom.xml | 141 ++
.../hive/streaming/AbstractRecordWriter.java | 324 +++
.../apache/hive/streaming/ConnectionError.java | 35 +
.../hive/streaming/DelimitedInputWriter.java | 331 +++
.../apache/hive/streaming/HeartBeatFailure.java | 33 +
.../org/apache/hive/streaming/HiveEndPoint.java | 1117 +++++++++
.../hive/streaming/ImpersonationFailed.java | 25 +
.../apache/hive/streaming/InvalidColumn.java | 26 +
.../apache/hive/streaming/InvalidPartition.java | 28 +
.../org/apache/hive/streaming/InvalidTable.java | 38 +
.../hive/streaming/InvalidTrasactionState.java | 26 +
.../hive/streaming/PartitionCreationFailed.java | 25 +
.../hive/streaming/QueryFailedException.java | 28 +
.../org/apache/hive/streaming/RecordWriter.java | 43 +
.../hive/streaming/SerializationError.java | 26 +
.../hive/streaming/StreamingConnection.java | 57 +
.../hive/streaming/StreamingException.java | 28 +
.../hive/streaming/StreamingIOFailure.java | 31 +
.../apache/hive/streaming/StrictJsonWriter.java | 162 ++
.../hive/streaming/StrictRegexWriter.java | 189 ++
.../apache/hive/streaming/TransactionBatch.java | 125 +
.../streaming/TransactionBatchUnAvailable.java | 25 +
.../apache/hive/streaming/TransactionError.java | 29 +
.../java/org/apache/hive/streaming/package.html | 181 ++
.../streaming/TestDelimitedInputWriter.java | 73 +
.../apache/hive/streaming/TestStreaming.java | 2330 ++++++++++++++++++
32 files changed, 5509 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package-info.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package-info.java
new file mode 100644
index 0000000..36d6b13
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@Deprecated // use org.apache.hive.streaming instead
+package org.apache.hive.hcatalog.streaming;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/itests/hive-unit/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml
index 05c362e..3ae7f2f 100644
--- a/itests/hive-unit/pom.xml
+++ b/itests/hive-unit/pom.xml
@@ -76,8 +76,8 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.hive.hcatalog</groupId>
- <artifactId>hive-hcatalog-streaming</artifactId>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-streaming</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 5966740..b19aa23 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -70,11 +70,11 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hive.hcatalog.common.HCatUtil;
-import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
-import org.apache.hive.hcatalog.streaming.HiveEndPoint;
-import org.apache.hive.hcatalog.streaming.StreamingConnection;
-import org.apache.hive.hcatalog.streaming.StreamingException;
-import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.apache.hive.streaming.DelimitedInputWriter;
+import org.apache.hive.streaming.HiveEndPoint;
+import org.apache.hive.streaming.StreamingConnection;
+import org.apache.hive.streaming.StreamingException;
+import org.apache.hive.streaming.TransactionBatch;
import org.apache.orc.OrcConf;
import org.junit.After;
import org.junit.Assert;
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/packaging/pom.xml
----------------------------------------------------------------------
diff --git a/packaging/pom.xml b/packaging/pom.xml
index e2d61bd..fe1aac8 100644
--- a/packaging/pom.xml
+++ b/packaging/pom.xml
@@ -258,6 +258,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-streaming</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-streaming</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/packaging/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/packaging/src/main/assembly/src.xml b/packaging/src/main/assembly/src.xml
index 486fe52..c477194 100644
--- a/packaging/src/main/assembly/src.xml
+++ b/packaging/src/main/assembly/src.xml
@@ -97,6 +97,7 @@
<include>spark-client/**/*</include>
<include>storage-api/**/*</include>
<include>standalone-metastore/**/*</include>
+ <include>streaming/**/*</include>
<include>testutils/**/*</include>
<include>vector-code-gen/**/*</include>
<include>kryo-registrator/**/*</include>
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2d30789..6c43181 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,6 +50,7 @@
<module>serde</module>
<module>service-rpc</module>
<module>service</module>
+ <module>streaming</module>
<module>llap-common</module>
<module>llap-client</module>
<module>llap-ext-client</module>
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/streaming/pom.xml b/streaming/pom.xml
new file mode 100644
index 0000000..b58ec01
--- /dev/null
+++ b/streaming/pom.xml
@@ -0,0 +1,141 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive</artifactId>
+ <version>3.1.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>hive-streaming</artifactId>
+ <packaging>jar</packaging>
+ <name>Hive Streaming</name>
+
+ <properties>
+ <hive.path.to.root>..</hive.path.to.root>
+ </properties>
+
+ <dependencies>
+ <!-- dependencies are always listed in sorted order by groupId, artifectId -->
+ <!-- intra-project -->
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-serde</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-metastore</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-cli</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-hcatalog-core</artifactId>
+ <optional>true</optional>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <optional>true</optional>
+ <version>3.3.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <optional>true</optional>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <optional>true</optional>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- test -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <scope>test</scope>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <sourceDirectory>${basedir}/src/java</sourceDirectory>
+ <testSourceDirectory>${basedir}/src/test</testSourceDirectory>
+ <resources>
+ </resources>
+ <plugins>
+ <!-- plugins are always listed in sorted order by groupId, artifectId -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
new file mode 100644
index 0000000..25998ae
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+
+
+public abstract class AbstractRecordWriter implements RecordWriter {
+ static final private Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName());
+
+ private final HiveConf conf;
+ private final HiveEndPoint endPoint;
+ final Table tbl;
+
+ private final IMetaStoreClient msClient;
+ final List<Integer> bucketIds;
+ private ArrayList<RecordUpdater> updaters = null;
+
+ private final int totalBuckets;
+ /**
+ * Indicates whether target table is bucketed
+ */
+ private final boolean isBucketed;
+
+ private final Path partitionPath;
+
+ private final AcidOutputFormat<?,?> outf;
+ private Object[] bucketFieldData; // Pre-allocated in constructor. Updated on each write.
+ private Long curBatchMinWriteId;
+ private Long curBatchMaxWriteId;
+
+ private static final class TableWriterPair {
+ private final Table tbl;
+ private final Path partitionPath;
+ TableWriterPair(Table t, Path p) {
+ tbl = t;
+ partitionPath = p;
+ }
+ }
+ /**
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #AbstractRecordWriter(HiveEndPoint, HiveConf, StreamingConnection)}
+ */
+ protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf)
+ throws ConnectionError, StreamingException {
+ this(endPoint, conf, null);
+ }
+ protected AbstractRecordWriter(HiveEndPoint endPoint2, HiveConf conf, StreamingConnection conn)
+ throws StreamingException {
+ this.endPoint = endPoint2;
+ this.conf = conf!=null ? conf
+ : HiveEndPoint.createHiveConf(DelimitedInputWriter.class, endPoint.metaStoreUri);
+ try {
+ msClient = HCatUtil.getHiveMetastoreClient(this.conf);
+ UserGroupInformation ugi = conn != null ? conn.getUserGroupInformation() : null;
+ if (ugi == null) {
+ this.tbl = msClient.getTable(endPoint.database, endPoint.table);
+ this.partitionPath = getPathForEndPoint(msClient, endPoint);
+ } else {
+ TableWriterPair twp = ugi.doAs(
+ new PrivilegedExceptionAction<TableWriterPair>() {
+ @Override
+ public TableWriterPair run() throws Exception {
+ return new TableWriterPair(msClient.getTable(endPoint.database, endPoint.table),
+ getPathForEndPoint(msClient, endPoint));
+ }
+ });
+ this.tbl = twp.tbl;
+ this.partitionPath = twp.partitionPath;
+ }
+ this.isBucketed = tbl.getSd().getNumBuckets() > 0;
+ /**
+ * For unbucketed tables we have exactly 1 RecrodUpdater for each AbstractRecordWriter which
+ * ends up writing to a file bucket_000000
+ * See also {@link #getBucket(Object)}
+ */
+ this.totalBuckets = isBucketed ? tbl.getSd().getNumBuckets() : 1;
+ if(isBucketed) {
+ this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols());
+ this.bucketFieldData = new Object[bucketIds.size()];
+ }
+ else {
+ bucketIds = Collections.emptyList();
+ }
+ String outFormatName = this.tbl.getSd().getOutputFormat();
+ outf = (AcidOutputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf);
+ } catch(InterruptedException e) {
+ throw new StreamingException(endPoint2.toString(), e);
+ } catch (MetaException | NoSuchObjectException e) {
+ throw new ConnectionError(endPoint2, e);
+ } catch (TException | ClassNotFoundException | IOException e) {
+ throw new StreamingException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * used to tag error msgs to provied some breadcrumbs
+ */
+ String getWatermark() {
+ return partitionPath + " writeIds[" + curBatchMinWriteId + "," + curBatchMaxWriteId + "]";
+ }
+ // return the column numbers of the bucketed columns
+ private List<Integer> getBucketColIDs(List<String> bucketCols, List<FieldSchema> cols) {
+ ArrayList<Integer> result = new ArrayList<Integer>(bucketCols.size());
+ HashSet<String> bucketSet = new HashSet<String>(bucketCols);
+ for (int i = 0; i < cols.size(); i++) {
+ if( bucketSet.contains(cols.get(i).getName()) ) {
+ result.add(i);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Get the SerDe for the Objects created by {@link #encode}. This is public so that test
+ * frameworks can use it.
+ * @return serde
+ * @throws SerializationError
+ */
+ public abstract AbstractSerDe getSerde() throws SerializationError;
+
+ /**
+ * Encode a record as an Object that Hive can read with the ObjectInspector associated with the
+ * serde returned by {@link #getSerde}. This is public so that test frameworks can use it.
+ * @param record record to be deserialized
+ * @return deserialized record as an Object
+ * @throws SerializationError
+ */
+ public abstract Object encode(byte[] record) throws SerializationError;
+
+ protected abstract ObjectInspector[] getBucketObjectInspectors();
+ protected abstract StructObjectInspector getRecordObjectInspector();
+ protected abstract StructField[] getBucketStructFields();
+
+ // returns the bucket number to which the record belongs to
+ protected int getBucket(Object row) throws SerializationError {
+ if(!isBucketed) {
+ return 0;
+ }
+ ObjectInspector[] inspectors = getBucketObjectInspectors();
+ Object[] bucketFields = getBucketFields(row);
+ return ObjectInspectorUtils.getBucketNumber(bucketFields, inspectors, totalBuckets);
+ }
+
+ @Override
+ public void flush() throws StreamingIOFailure {
+ try {
+ for (RecordUpdater updater : updaters) {
+ if (updater != null) {
+ updater.flush();
+ }
+ }
+ } catch (IOException e) {
+ throw new StreamingIOFailure("Unable to flush recordUpdater", e);
+ }
+ }
+
+ @Override
+ public void clear() throws StreamingIOFailure {
+ }
+
+ /**
+ * Creates a new record updater for the new batch
+ * @param minWriteId smallest writeid in the batch
+ * @param maxWriteID largest writeid in the batch
+ * @throws StreamingIOFailure if failed to create record updater
+ */
+ @Override
+ public void newBatch(Long minWriteId, Long maxWriteID)
+ throws StreamingIOFailure, SerializationError {
+ curBatchMinWriteId = minWriteId;
+ curBatchMaxWriteId = maxWriteID;
+ updaters = new ArrayList<RecordUpdater>(totalBuckets);
+ for (int bucket = 0; bucket < totalBuckets; bucket++) {
+ updaters.add(bucket, null);//so that get(i) returns null rather than ArrayOutOfBounds
+ }
+ }
+
+ @Override
+ public void closeBatch() throws StreamingIOFailure {
+ boolean haveError = false;
+ for (RecordUpdater updater : updaters) {
+ if (updater != null) {
+ try {
+ //try not to leave any files open
+ updater.close(false);
+ } catch (Exception ex) {
+ haveError = true;
+ LOG.error("Unable to close " + updater + " due to: " + ex.getMessage(), ex);
+ }
+ }
+ }
+ updaters.clear();
+ if(haveError) {
+ throw new StreamingIOFailure("Encountered errors while closing (see logs) " + getWatermark());
+ }
+ }
+
+ protected static ObjectInspector[] getObjectInspectorsForBucketedCols(List<Integer> bucketIds
+ , StructObjectInspector recordObjInspector)
+ throws SerializationError {
+ ObjectInspector[] result = new ObjectInspector[bucketIds.size()];
+
+ for (int i = 0; i < bucketIds.size(); i++) {
+ int bucketId = bucketIds.get(i);
+ result[i] =
+ recordObjInspector.getAllStructFieldRefs().get( bucketId ).getFieldObjectInspector();
+ }
+ return result;
+ }
+
+
+ private Object[] getBucketFields(Object row) throws SerializationError {
+ StructObjectInspector recordObjInspector = getRecordObjectInspector();
+ StructField[] bucketStructFields = getBucketStructFields();
+ for (int i = 0; i < bucketIds.size(); i++) {
+ bucketFieldData[i] = recordObjInspector.getStructFieldData(row, bucketStructFields[i]);
+ }
+ return bucketFieldData;
+ }
+
+ private RecordUpdater createRecordUpdater(int bucketId, Long minWriteId, Long maxWriteID)
+ throws IOException, SerializationError {
+ try {
+ // Initialize table properties from the table parameters. This is required because the table
+ // may define certain table parameters that may be required while writing. The table parameter
+ // 'transactional_properties' is one such example.
+ Properties tblProperties = new Properties();
+ tblProperties.putAll(tbl.getParameters());
+ return outf.getRecordUpdater(partitionPath,
+ new AcidOutputFormat.Options(conf)
+ .inspector(getSerde().getObjectInspector())
+ .bucket(bucketId)
+ .tableProperties(tblProperties)
+ .minimumWriteId(minWriteId)
+ .maximumWriteId(maxWriteID)
+ .statementId(-1)
+ .finalDestination(partitionPath));
+ } catch (SerDeException e) {
+ throw new SerializationError("Failed to get object inspector from Serde "
+ + getSerde().getClass().getName(), e);
+ }
+ }
+
+ RecordUpdater getRecordUpdater(int bucketId) throws StreamingIOFailure, SerializationError {
+ RecordUpdater recordUpdater = updaters.get(bucketId);
+ if (recordUpdater == null) {
+ try {
+ recordUpdater = createRecordUpdater(bucketId, curBatchMinWriteId, curBatchMaxWriteId);
+ } catch (IOException e) {
+ String errMsg = "Failed creating RecordUpdater for " + getWatermark();
+ LOG.error(errMsg, e);
+ throw new StreamingIOFailure(errMsg, e);
+ }
+ updaters.set(bucketId, recordUpdater);
+ }
+ return recordUpdater;
+ }
+
+ private Path getPathForEndPoint(IMetaStoreClient msClient, HiveEndPoint endPoint)
+ throws StreamingException {
+ try {
+ String location;
+ if(endPoint.partitionVals==null || endPoint.partitionVals.isEmpty() ) {
+ location = msClient.getTable(endPoint.database,endPoint.table)
+ .getSd().getLocation();
+ } else {
+ location = msClient.getPartition(endPoint.database, endPoint.table,
+ endPoint.partitionVals).getSd().getLocation();
+ }
+ return new Path(location);
+ } catch (TException e) {
+ throw new StreamingException(e.getMessage()
+ + ". Unable to get path for end point: "
+ + endPoint.partitionVals, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/ConnectionError.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/ConnectionError.java b/streaming/src/java/org/apache/hive/streaming/ConnectionError.java
new file mode 100644
index 0000000..668bffb
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/ConnectionError.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class ConnectionError extends StreamingException {
+
+ public ConnectionError(String msg) {
+ super(msg);
+ }
+
+ public ConnectionError(String msg, Exception innerEx) {
+ super(msg, innerEx);
+ }
+
+ public ConnectionError(HiveEndPoint endPoint, Exception innerEx) {
+ super("Error connecting to " + endPoint +
+ (innerEx == null ? "" : ": " + innerEx.getMessage()), innerEx);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java b/streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java
new file mode 100644
index 0000000..898b3f9
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.BytesWritable;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Streaming Writer handles delimited input (eg. CSV).
+ * Delimited input is parsed & reordered to match column order in table
+ * Uses Lazy Simple Serde to process delimited input
+ */
+public class DelimitedInputWriter extends AbstractRecordWriter {
+ private final boolean reorderingNeeded;
+ private String delimiter;
+ private char serdeSeparator;
+ private int[] fieldToColMapping;
+ private final ArrayList<String> tableColumns;
+ private LazySimpleSerDe serde = null;
+
+ private final LazySimpleStructObjectInspector recordObjInspector;
+ private final ObjectInspector[] bucketObjInspectors;
+ private final StructField[] bucketStructFields;
+
+ static final private Logger LOG = LoggerFactory.getLogger(DelimitedInputWriter.class.getName());
+
+ /** Constructor. Uses default separator of the LazySimpleSerde
+ * @param colNamesForFields Column name assignment for input fields. nulls or empty
+ * strings in the array indicates the fields to be skipped
+ * @param delimiter input field delimiter
+ * @param endPoint Hive endpoint
+ * @throws ConnectionError Problem talking to Hive
+ * @throws ClassNotFoundException Serde class not found
+ * @throws SerializationError Serde initialization/interaction failed
+ * @throws StreamingException Problem acquiring file system path for partition
+ * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
+ */
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint, StreamingConnection conn)
+ throws ClassNotFoundException, ConnectionError, SerializationError,
+ InvalidColumn, StreamingException {
+ this(colNamesForFields, delimiter, endPoint, null, conn);
+ }
+ /** Constructor. Uses default separator of the LazySimpleSerde
+ * @param colNamesForFields Column name assignment for input fields. nulls or empty
+ * strings in the array indicates the fields to be skipped
+ * @param delimiter input field delimiter
+ * @param endPoint Hive endpoint
+ * @param conf a Hive conf object. Can be null if not using advanced hive settings.
+ * @throws ConnectionError Problem talking to Hive
+ * @throws ClassNotFoundException Serde class not found
+ * @throws SerializationError Serde initialization/interaction failed
+ * @throws StreamingException Problem acquiring file system path for partition
+ * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
+ */
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
+ throws ClassNotFoundException, ConnectionError, SerializationError,
+ InvalidColumn, StreamingException {
+ this(colNamesForFields, delimiter, endPoint, conf,
+ (char) LazySerDeParameters.DefaultSeparators[0], conn);
+ }
+ /**
+ * Constructor. Allows overriding separator of the LazySimpleSerde
+ * @param colNamesForFields Column name assignment for input fields
+ * @param delimiter input field delimiter
+ * @param endPoint Hive endpoint
+ * @param conf a Hive conf object. Set to null if not using advanced hive settings.
+ * @param serdeSeparator separator used when encoding data that is fed into the
+ * LazySimpleSerde. Ensure this separator does not occur
+ * in the field data
+ * @param conn connection this Writer is to be used with
+ * @throws ConnectionError Problem talking to Hive
+ * @throws ClassNotFoundException Serde class not found
+ * @throws SerializationError Serde initialization/interaction failed
+ * @throws StreamingException Problem acquiring file system path for partition
+ * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
+ */
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint, HiveConf conf, char serdeSeparator, StreamingConnection conn)
+ throws ClassNotFoundException, ConnectionError, SerializationError,
+ InvalidColumn, StreamingException {
+ super(endPoint, conf, conn);
+ this.tableColumns = getCols(tbl);
+ this.serdeSeparator = serdeSeparator;
+ this.delimiter = delimiter;
+ this.fieldToColMapping = getFieldReordering(colNamesForFields, getTableColumns());
+ this.reorderingNeeded = isReorderingNeeded(delimiter, getTableColumns());
+ LOG.debug("Field reordering needed = " + this.reorderingNeeded + ", for endpoint " + endPoint);
+ this.serdeSeparator = serdeSeparator;
+ this.serde = createSerde(tbl, conf, serdeSeparator);
+
+ // get ObjInspectors for entire record and bucketed cols
+ try {
+ this.recordObjInspector = (LazySimpleStructObjectInspector) serde.getObjectInspector();
+ this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
+ }
+
+ // get StructFields for bucketed cols
+ bucketStructFields = new StructField[bucketIds.size()];
+ List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
+ for (int i = 0; i < bucketIds.size(); i++) {
+ bucketStructFields[i] = allFields.get(bucketIds.get(i));
+ }
+ }
+ /**
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, StreamingConnection)}
+ */
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint)
+ throws ClassNotFoundException, ConnectionError, SerializationError,
+ InvalidColumn, StreamingException {
+ this(colNamesForFields, delimiter, endPoint, null, null);
+ }
+ /**
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, StreamingConnection)}
+ */
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint, HiveConf conf)
+ throws ClassNotFoundException, ConnectionError, SerializationError,
+ InvalidColumn, StreamingException {
+ this(colNamesForFields, delimiter, endPoint, conf,
+ (char) LazySerDeParameters.DefaultSeparators[0], null);
+ }
+ /**
+ * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, char, StreamingConnection)}
+ */
+ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+ HiveEndPoint endPoint, HiveConf conf, char serdeSeparator)
+ throws ClassNotFoundException, StreamingException {
+ this(colNamesForFields, delimiter, endPoint, conf, serdeSeparator, null);
+ }
+
+ private boolean isReorderingNeeded(String delimiter, ArrayList<String> tableColumns) {
+ return !( delimiter.equals(String.valueOf(getSerdeSeparator()))
+ && areFieldsInColOrder(fieldToColMapping)
+ && tableColumns.size()>=fieldToColMapping.length );
+ }
+
+ private static boolean areFieldsInColOrder(int[] fieldToColMapping) {
+ for(int i=0; i<fieldToColMapping.length; ++i) {
+ if(fieldToColMapping[i]!=i) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @VisibleForTesting
+ static int[] getFieldReordering(String[] colNamesForFields, List<String> tableColNames)
+ throws InvalidColumn {
+ int[] result = new int[ colNamesForFields.length ];
+ for(int i=0; i<colNamesForFields.length; ++i) {
+ result[i] = -1;
+ }
+ int i=-1, fieldLabelCount=0;
+ for( String col : colNamesForFields ) {
+ ++i;
+ if(col == null) {
+ continue;
+ }
+ if( col.trim().isEmpty() ) {
+ continue;
+ }
+ ++fieldLabelCount;
+ int loc = tableColNames.indexOf(col);
+ if(loc == -1) {
+ throw new InvalidColumn("Column '" + col + "' not found in table for input field " + i+1);
+ }
+ result[i] = loc;
+ }
+ if(fieldLabelCount>tableColNames.size()) {
+ throw new InvalidColumn("Number of field names exceeds the number of columns in table");
+ }
+ return result;
+ }
+
+ // Reorder fields in record based on the order of columns in the table
+ protected byte[] reorderFields(byte[] record) throws UnsupportedEncodingException {
+ if(!reorderingNeeded) {
+ return record;
+ }
+ String[] reorderedFields = new String[getTableColumns().size()];
+ String decoded = new String(record);
+ String[] fields = decoded.split(delimiter,-1);
+ for (int i=0; i<fieldToColMapping.length; ++i) {
+ int newIndex = fieldToColMapping[i];
+ if(newIndex != -1) {
+ reorderedFields[newIndex] = fields[i];
+ }
+ }
+ return join(reorderedFields, getSerdeSeparator());
+ }
+
+ // handles nulls in items[]
+ // TODO: perhaps can be made more efficient by creating a byte[] directly
+ private static byte[] join(String[] items, char separator) {
+ StringBuilder buff = new StringBuilder(100);
+ if(items.length == 0)
+ return "".getBytes();
+ int i=0;
+ for(; i<items.length-1; ++i) {
+ if(items[i]!=null) {
+ buff.append(items[i]);
+ }
+ buff.append(separator);
+ }
+ if(items[i]!=null) {
+ buff.append(items[i]);
+ }
+ return buff.toString().getBytes();
+ }
+
+ protected ArrayList<String> getTableColumns() {
+ return tableColumns;
+ }
+
+ @Override
+ public void write(long writeId, byte[] record)
+ throws SerializationError, StreamingIOFailure {
+ try {
+ byte[] orderedFields = reorderFields(record);
+ Object encodedRow = encode(orderedFields);
+ int bucket = getBucket(encodedRow);
+ getRecordUpdater(bucket).insert(writeId, encodedRow);
+ } catch (IOException e) {
+ throw new StreamingIOFailure("Error writing record in transaction write id ("
+ + writeId + ")", e);
+ }
+ }
+
+ @Override
+ public AbstractSerDe getSerde() {
+ return serde;
+ }
+
+ protected LazySimpleStructObjectInspector getRecordObjectInspector() {
+ return recordObjInspector;
+ }
+
+ @Override
+ protected StructField[] getBucketStructFields() {
+ return bucketStructFields;
+ }
+
+ protected ObjectInspector[] getBucketObjectInspectors() {
+ return bucketObjInspectors;
+ }
+
+ @Override
+ public Object encode(byte[] record) throws SerializationError {
+ try {
+ BytesWritable blob = new BytesWritable();
+ blob.set(record, 0, record.length);
+ return serde.deserialize(blob);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to convert byte[] record into Object", e);
+ }
+ }
+
+ /**
+ * Creates LazySimpleSerde
+ * @return
+ * @throws SerializationError if serde could not be initialized
+ * @param tbl
+ */
+ protected static LazySimpleSerDe createSerde(Table tbl, HiveConf conf, char serdeSeparator)
+ throws SerializationError {
+ try {
+ Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+ tableProps.setProperty("field.delim", String.valueOf(serdeSeparator));
+ LazySimpleSerDe serde = new LazySimpleSerDe();
+ SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+ return serde;
+ } catch (SerDeException e) {
+ throw new SerializationError("Error initializing serde", e);
+ }
+ }
+
+ private ArrayList<String> getCols(Table table) {
+ List<FieldSchema> cols = table.getSd().getCols();
+ ArrayList<String> colNames = new ArrayList<String>(cols.size());
+ for (FieldSchema col : cols) {
+ colNames.add(col.getName().toLowerCase());
+ }
+ return colNames;
+ }
+
+ public char getSerdeSeparator() {
+ return serdeSeparator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java b/streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java
new file mode 100644
index 0000000..b1f9520
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import java.util.Collection;
+import java.util.Set;
+
+public class HeartBeatFailure extends StreamingException {
+ private Collection<Long> abortedTxns;
+ private Collection<Long> nosuchTxns;
+
+ public HeartBeatFailure(Collection<Long> abortedTxns, Set<Long> nosuchTxns) {
+ super("Heart beat error. InvalidTxns: " + nosuchTxns + ". AbortedTxns: " + abortedTxns);
+ this.abortedTxns = abortedTxns;
+ this.nosuchTxns = nosuchTxns;
+ }
+}