You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/05/02 16:44:59 UTC
[3/7] hive git commit: HIVE-19211: New streaming ingest API and
support for dynamic partitioning (Prasanth Jayachandran reviewed by Eugene
Koifman)
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/InvalidColumn.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidColumn.java b/streaming/src/java/org/apache/hive/streaming/InvalidColumn.java
deleted file mode 100644
index 0011b14..0000000
--- a/streaming/src/java/org/apache/hive/streaming/InvalidColumn.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.streaming;
-
-public class InvalidColumn extends StreamingException {
-
- public InvalidColumn(String msg) {
- super(msg);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/InvalidPartition.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidPartition.java b/streaming/src/java/org/apache/hive/streaming/InvalidPartition.java
deleted file mode 100644
index f1f9804..0000000
--- a/streaming/src/java/org/apache/hive/streaming/InvalidPartition.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.streaming;
-
-public class InvalidPartition extends StreamingException {
-
- public InvalidPartition(String partitionName, String partitionValue) {
- super("Invalid partition: Name=" + partitionName +
- ", Value=" + partitionValue);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/InvalidTable.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidTable.java b/streaming/src/java/org/apache/hive/streaming/InvalidTable.java
index ef1c91d..5c60160 100644
--- a/streaming/src/java/org/apache/hive/streaming/InvalidTable.java
+++ b/streaming/src/java/org/apache/hive/streaming/InvalidTable.java
@@ -24,15 +24,11 @@ public class InvalidTable extends StreamingException {
return "Invalid table db:" + db + ", table:" + table;
}
- public InvalidTable(String db, String table) {
- super(makeMsg(db,table), null);
- }
-
- public InvalidTable(String db, String table, String msg) {
+ InvalidTable(String db, String table, String msg) {
super(makeMsg(db, table) + ": " + msg, null);
}
- public InvalidTable(String db, String table, Exception inner) {
+ InvalidTable(String db, String table, Exception inner) {
super(makeMsg(db, table) + ": " + inner.getMessage(), inner);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java b/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java
new file mode 100644
index 0000000..9d92dfa
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/InvalidTransactionState.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidTransactionState extends TransactionError {
+ InvalidTransactionState(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java b/streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java
deleted file mode 100644
index 762f5f8..0000000
--- a/streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.streaming;
-
-public class InvalidTrasactionState extends TransactionError {
- public InvalidTrasactionState(String msg) {
- super(msg);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java b/streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java
index 5f9aca6..e464399 100644
--- a/streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java
+++ b/streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java
@@ -19,7 +19,7 @@
package org.apache.hive.streaming;
public class PartitionCreationFailed extends StreamingException {
- public PartitionCreationFailed(HiveEndPoint endPoint, Exception cause) {
- super("Failed to create partition " + endPoint, cause);
+ PartitionCreationFailed(StreamingConnection connection, Throwable cause) {
+ super("Failed to create partition " + connection, cause);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/PartitionHandler.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/PartitionHandler.java b/streaming/src/java/org/apache/hive/streaming/PartitionHandler.java
new file mode 100644
index 0000000..1b0e7de
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/PartitionHandler.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import java.util.List;
+
+public interface PartitionHandler {
+
+ /**
+ * Creates a partition if it does not exist.
+ *
+ * @param partitionValues - partition values
+ * @return partition location
+ * @throws StreamingException - any metastore related exceptions
+ */
+ PartitionInfo createPartitionIfNotExists(List<String> partitionValues) throws StreamingException;
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java b/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java
new file mode 100644
index 0000000..ce9f76a
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/PartitionInfo.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+/**
+ * Simple wrapper class for minimal partition related information used by streaming ingest.
+ */
+public class PartitionInfo {
+ private String name;
+ private String partitionLocation;
+ private boolean exists;
+
+ public PartitionInfo(final String name, final String partitionLocation, final boolean exists) {
+ this.name = name;
+ this.partitionLocation = partitionLocation;
+ this.exists = exists;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(final String name) {
+ this.name = name;
+ }
+
+ public String getPartitionLocation() {
+ return partitionLocation;
+ }
+
+ public void setPartitionLocation(final String partitionLocation) {
+ this.partitionLocation = partitionLocation;
+ }
+
+ public boolean isExists() {
+ return exists;
+ }
+
+ public void setExists(final boolean exists) {
+ this.exists = exists;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/QueryFailedException.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/QueryFailedException.java b/streaming/src/java/org/apache/hive/streaming/QueryFailedException.java
deleted file mode 100644
index ccd3ae0..0000000
--- a/streaming/src/java/org/apache/hive/streaming/QueryFailedException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.streaming;
-
-public class QueryFailedException extends StreamingException {
- String query;
-
- public QueryFailedException(String query, Exception e) {
- super("Query failed: " + query + ". Due to :" + e.getMessage(), e);
- this.query = query;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
index dc6d70e..4d25924 100644
--- a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
@@ -19,25 +19,44 @@
package org.apache.hive.streaming;
+import java.util.Set;
+
public interface RecordWriter {
- /** Writes using a hive RecordUpdater
+ /**
+ * Initialize record writer.
+ *
+ * @param connection - streaming connection
+ * @param minWriteId - min write id
+ * @param maxWriteID - max write id
+ * @throws StreamingException - thrown when initialization failed
+ */
+ void init(StreamingConnection connection, long minWriteId, long maxWriteID) throws StreamingException;
+
+ /**
+ * Writes using a hive RecordUpdater
*
* @param writeId the write ID of the table mapping to Txn in which the write occurs
- * @param record the record to be written
+ * @param record the record to be written
*/
void write(long writeId, byte[] record) throws StreamingException;
- /** Flush records from buffer. Invoked by TransactionBatch.commit() */
+ /**
+ * Flush records from buffer. Invoked by TransactionBatch.commitTransaction()
+ */
void flush() throws StreamingException;
- /** Clear bufferred writes. Invoked by TransactionBatch.abort() */
- void clear() throws StreamingException;
-
- /** Acquire a new RecordUpdater. Invoked when
- * StreamingConnection.fetchTransactionBatch() is called */
- void newBatch(Long minWriteId, Long maxWriteID) throws StreamingException;
+ /**
+ * Close the RecordUpdater. Invoked by TransactionBatch.close()
+ *
+ * @throws StreamingException - thrown when record writer cannot be closed.
+ */
+ void close() throws StreamingException;
- /** Close the RecordUpdater. Invoked by TransactionBatch.close() */
- void closeBatch() throws StreamingException;
+ /**
+ * Get the set of partitions that were added by the record writer.
+ *
+ * @return - set of partitions
+ */
+ Set<String> getPartitions();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/SerializationError.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/SerializationError.java b/streaming/src/java/org/apache/hive/streaming/SerializationError.java
index a57ba00..1473ff8 100644
--- a/streaming/src/java/org/apache/hive/streaming/SerializationError.java
+++ b/streaming/src/java/org/apache/hive/streaming/SerializationError.java
@@ -20,7 +20,7 @@ package org.apache.hive.streaming;
public class SerializationError extends StreamingException {
- public SerializationError(String msg, Exception e) {
+ SerializationError(String msg, Exception e) {
super(msg,e);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
index 2f760ea..cd7f3d8 100644
--- a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
@@ -18,40 +18,47 @@
package org.apache.hive.streaming;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.hive.conf.HiveConf;
-/**
- * Represents a connection to a HiveEndPoint. Used to acquire transaction batches.
- * Note: the expectation is that there is at most 1 TransactionBatch outstanding for any given
- * StreamingConnection. Violating this may result in "out of sequence response".
- */
-public interface StreamingConnection {
+public interface StreamingConnection extends ConnectionInfo, PartitionHandler {
+ /**
+ * Returns hive configuration object used during connection creation.
+ *
+ * @return - hive conf
+ */
+ HiveConf getHiveConf();
+
+ /**
+ * Begin a transaction for writing.
+ *
+ * @throws StreamingException - if there are errors when beginning transaction
+ */
+ void beginTransaction() throws StreamingException;
+
+ /**
+ * Write record using RecordWriter.
+ *
+ * @param record - the data to be written
+ * @throws StreamingException - if there are errors when writing
+ */
+ void write(byte[] record) throws StreamingException;
/**
- * Acquires a new batch of transactions from Hive.
-
- * @param numTransactionsHint is a hint from client indicating how many transactions client needs.
- * @param writer Used to write record. The same writer instance can
- * be shared with another TransactionBatch (to the same endpoint)
- * only after the first TransactionBatch has been closed.
- * Writer will be closed when the TransactionBatch is closed.
- * @return
- * @throws ConnectionError
- * @throws InvalidPartition
- * @throws StreamingException
- * @return a batch of transactions
+ * Commit a transaction to make the writes visible for readers.
+ *
+ * @throws StreamingException - if there are errors when committing the open transaction
*/
- public TransactionBatch fetchTransactionBatch(int numTransactionsHint,
- RecordWriter writer)
- throws ConnectionError, StreamingException, InterruptedException;
+ void commitTransaction() throws StreamingException;
/**
- * Close connection
+ * Manually abort the opened transaction.
+ *
+ * @throws StreamingException - if there are errors when aborting the transaction
*/
- public void close();
+ void abortTransaction() throws StreamingException;
/**
- * @return UserGroupInformation associated with this connection or {@code null} if there is none
+ * Closes streaming connection.
*/
- UserGroupInformation getUserGroupInformation();
+ void close();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/StreamingException.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingException.java b/streaming/src/java/org/apache/hive/streaming/StreamingException.java
index a7f84c1..1af5c6a 100644
--- a/streaming/src/java/org/apache/hive/streaming/StreamingException.java
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingException.java
@@ -19,7 +19,7 @@
package org.apache.hive.streaming;
public class StreamingException extends Exception {
- public StreamingException(String msg, Exception cause) {
+ public StreamingException(String msg, Throwable cause) {
super(msg, cause);
}
public StreamingException(String msg) {
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java b/streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java
index 0dfbfa7..090167d 100644
--- a/streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java
@@ -21,11 +21,11 @@ package org.apache.hive.streaming;
public class StreamingIOFailure extends StreamingException {
- public StreamingIOFailure(String msg, Exception cause) {
+ StreamingIOFailure(String msg, Exception cause) {
super(msg, cause);
}
- public StreamingIOFailure(String msg) {
+ StreamingIOFailure(String msg) {
super(msg);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java
new file mode 100644
index 0000000..4a07435
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+import java.util.Properties;
+
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.io.BytesWritable;
+
+import com.google.common.base.Joiner;
+
+/**
+ * Streaming Writer handles delimited input (eg. CSV).
+ * Delimited input is parsed to extract partition values, bucketing info and is forwarded to record updater.
+ * Uses Lazy Simple SerDe to process delimited input.
+ *
+ * NOTE: This record writer is NOT thread-safe. Use one record writer per streaming connection.
+ */
+public class StrictDelimitedInputWriter extends AbstractRecordWriter {
+ private char fieldDelimiter;
+ private char collectionDelimiter;
+ private char mapKeyDelimiter;
+ private LazySimpleSerDe serde;
+
+ private StrictDelimitedInputWriter(Builder builder) {
+ this.fieldDelimiter = builder.fieldDelimiter;
+ this.collectionDelimiter = builder.collectionDelimiter;
+ this.mapKeyDelimiter = builder.mapKeyDelimiter;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private char fieldDelimiter = (char) LazySerDeParameters.DefaultSeparators[0];
+ private char collectionDelimiter = (char) LazySerDeParameters.DefaultSeparators[1];
+ private char mapKeyDelimiter = (char) LazySerDeParameters.DefaultSeparators[2];
+
+ public Builder withFieldDelimiter(final char fieldDelimiter) {
+ this.fieldDelimiter = fieldDelimiter;
+ return this;
+ }
+
+ public Builder withCollectionDelimiter(final char collectionDelimiter) {
+ this.collectionDelimiter = collectionDelimiter;
+ return this;
+ }
+
+ public Builder withMapKeyDelimiter(final char mapKeyDelimiter) {
+ this.mapKeyDelimiter = mapKeyDelimiter;
+ return this;
+ }
+
+ public StrictDelimitedInputWriter build() {
+ return new StrictDelimitedInputWriter(this);
+ }
+ }
+
+ @Override
+ public Object encode(byte[] record) throws SerializationError {
+ try {
+ BytesWritable blob = new BytesWritable();
+ blob.set(record, 0, record.length);
+ return serde.deserialize(blob);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to convert byte[] record into Object", e);
+ }
+ }
+
+ @Override
+ public LazySimpleSerDe createSerde() throws SerializationError {
+ try {
+ Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+ tableProps.setProperty(serdeConstants.LIST_COLUMNS, Joiner.on(",").join(inputColumns));
+ tableProps.setProperty(serdeConstants.LIST_COLUMN_TYPES, Joiner.on(":").join(inputTypes));
+ tableProps.setProperty(serdeConstants.FIELD_DELIM, String.valueOf(fieldDelimiter));
+ tableProps.setProperty(serdeConstants.COLLECTION_DELIM, String.valueOf(collectionDelimiter));
+ tableProps.setProperty(serdeConstants.MAPKEY_DELIM, String.valueOf(mapKeyDelimiter));
+ LazySimpleSerDe serde = new LazySimpleSerDe();
+ SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+ this.serde = serde;
+ return serde;
+ } catch (SerDeException e) {
+ throw new SerializationError("Error initializing serde", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
index 0077913..1600e7c 100644
--- a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
@@ -18,131 +18,45 @@
package org.apache.hive.streaming;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Table;
+import java.util.Properties;
+
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.JsonSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.io.Text;
-import org.apache.hive.hcatalog.data.HCatRecordObjectInspector;
-import org.apache.hive.hcatalog.data.JsonSerDe;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Properties;
/**
* Streaming Writer handles utf8 encoded Json (Strict syntax).
- * Uses org.apache.hive.hcatalog.data.JsonSerDe to process Json input
+ * Uses org.apache.hadoop.hive.serde2.JsonSerDe to process Json input
+ *
+ * NOTE: This record writer is NOT thread-safe. Use one record writer per streaming connection.
*/
public class StrictJsonWriter extends AbstractRecordWriter {
private JsonSerDe serde;
- private final HCatRecordObjectInspector recordObjInspector;
- private final ObjectInspector[] bucketObjInspectors;
- private final StructField[] bucketStructFields;
-
- /**
- * @deprecated As of release 1.3/2.1. Replaced by {@link #StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)}
- */
- public StrictJsonWriter(HiveEndPoint endPoint)
- throws ConnectionError, SerializationError, StreamingException {
- this(endPoint, null, null);
- }
-
- /**
- * @deprecated As of release 1.3/2.1. Replaced by {@link #StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)}
- */
- public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf) throws StreamingException {
- this(endPoint, conf, null);
- }
- /**
- * @param endPoint the end point to write to
- * @throws ConnectionError
- * @throws SerializationError
- * @throws StreamingException
- */
- public StrictJsonWriter(HiveEndPoint endPoint, StreamingConnection conn)
- throws ConnectionError, SerializationError, StreamingException {
- this(endPoint, null, conn);
+ public static Builder newBuilder() {
+ return new Builder();
}
- /**
- * @param endPoint the end point to write to
- * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
- * @param conn connection this Writer is to be used with
- * @throws ConnectionError
- * @throws SerializationError
- * @throws StreamingException
- */
- public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
- throws ConnectionError, SerializationError, StreamingException {
- super(endPoint, conf, conn);
- this.serde = createSerde(tbl, conf);
- // get ObjInspectors for entire record and bucketed cols
- try {
- recordObjInspector = ( HCatRecordObjectInspector ) serde.getObjectInspector();
- this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
- } catch (SerDeException e) {
- throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
- }
- // get StructFields for bucketed cols
- bucketStructFields = new StructField[bucketIds.size()];
- List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
- for (int i = 0; i < bucketIds.size(); i++) {
- bucketStructFields[i] = allFields.get(bucketIds.get(i));
+ public static class Builder {
+ public StrictJsonWriter build() {
+ return new StrictJsonWriter();
}
}
- @Override
- public AbstractSerDe getSerde() {
- return serde;
- }
-
- protected HCatRecordObjectInspector getRecordObjectInspector() {
- return recordObjInspector;
- }
-
- @Override
- protected StructField[] getBucketStructFields() {
- return bucketStructFields;
- }
-
- protected ObjectInspector[] getBucketObjectInspectors() {
- return bucketObjInspectors;
- }
-
-
- @Override
- public void write(long writeId, byte[] record)
- throws StreamingIOFailure, SerializationError {
- try {
- Object encodedRow = encode(record);
- int bucket = getBucket(encodedRow);
- getRecordUpdater(bucket).insert(writeId, encodedRow);
- } catch (IOException e) {
- throw new StreamingIOFailure("Error writing record in transaction write id("
- + writeId + ")", e);
- }
-
- }
-
/**
* Creates JsonSerDe
- * @param tbl used to create serde
- * @param conf used to create serde
- * @return
+ *
* @throws SerializationError if serde could not be initialized
*/
- private static JsonSerDe createSerde(Table tbl, HiveConf conf)
- throws SerializationError {
+ @Override
+ public JsonSerDe createSerde() throws SerializationError {
try {
Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
JsonSerDe serde = new JsonSerDe();
SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+ this.serde = serde;
return serde;
} catch (SerDeException e) {
throw new SerializationError("Error initializing serde " + JsonSerDe.class.getName(), e);
@@ -158,5 +72,4 @@ public class StrictJsonWriter extends AbstractRecordWriter {
throw new SerializationError("Unable to convert byte[] record into Object", e);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
index c0b7324..563cf66 100644
--- a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
@@ -18,163 +18,73 @@
package org.apache.hive.streaming;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Properties;
import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.RegexSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.Text;
/**
* Streaming Writer handles text input data with regex. Uses
* org.apache.hadoop.hive.serde2.RegexSerDe
+ *
+ * NOTE: This record writer is NOT thread-safe. Use one record writer per streaming connection.
*/
public class StrictRegexWriter extends AbstractRecordWriter {
+ private String regex;
private RegexSerDe serde;
- private final StructObjectInspector recordObjInspector;
- private final ObjectInspector[] bucketObjInspectors;
- private final StructField[] bucketStructFields;
- /**
- * @param endPoint the end point to write to
- * @param conn connection this Writer is to be used with
- * @throws ConnectionError
- * @throws SerializationError
- * @throws StreamingException
- */
- public StrictRegexWriter(HiveEndPoint endPoint, StreamingConnection conn)
- throws ConnectionError, SerializationError, StreamingException {
- this(null, endPoint, null, conn);
+ private StrictRegexWriter(final Builder builder) {
+ this.regex = builder.regex;
}
- /**
- * @param endPoint the end point to write to
- * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
- * @param conn connection this Writer is to be used with
- * @throws ConnectionError
- * @throws SerializationError
- * @throws StreamingException
- */
- public StrictRegexWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
- throws ConnectionError, SerializationError, StreamingException {
- this(null, endPoint, conf, conn);
+ public static Builder newBuilder() {
+ return new Builder();
}
- /**
- * @param regex to parse the data
- * @param endPoint the end point to write to
- * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
- * @param conn connection this Writer is to be used with
- * @throws ConnectionError
- * @throws SerializationError
- * @throws StreamingException
- */
- public StrictRegexWriter(String regex, HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
- throws ConnectionError, SerializationError, StreamingException {
- super(endPoint, conf, conn);
- this.serde = createSerde(tbl, conf, regex);
- // get ObjInspectors for entire record and bucketed cols
- try {
- recordObjInspector = (StructObjectInspector) serde.getObjectInspector();
- this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
- } catch (SerDeException e) {
- throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
- }
+ public static class Builder {
+ private String regex;
- // get StructFields for bucketed cols
- bucketStructFields = new StructField[bucketIds.size()];
- List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
- for (int i = 0; i < bucketIds.size(); i++) {
- bucketStructFields[i] = allFields.get(bucketIds.get(i));
+ public Builder withRegex(final String regex) {
+ this.regex = regex;
+ return this;
}
- }
-
- @Override
- public AbstractSerDe getSerde() {
- return serde;
- }
-
- @Override
- protected StructObjectInspector getRecordObjectInspector() {
- return recordObjInspector;
- }
-
- @Override
- protected StructField[] getBucketStructFields() {
- return bucketStructFields;
- }
-
- @Override
- protected ObjectInspector[] getBucketObjectInspectors() {
- return bucketObjInspectors;
- }
-
- @Override
- public void write(long writeId, byte[] record)
- throws StreamingIOFailure, SerializationError {
- try {
- Object encodedRow = encode(record);
- int bucket = getBucket(encodedRow);
- getRecordUpdater(bucket).insert(writeId, encodedRow);
- } catch (IOException e) {
- throw new StreamingIOFailure("Error writing record in transaction write id("
- + writeId + ")", e);
+ public StrictRegexWriter build() {
+ return new StrictRegexWriter(this);
}
}
/**
* Creates RegexSerDe
*
- * @param tbl used to create serde
- * @param conf used to create serde
- * @param regex used to create serde
- * @return
* @throws SerializationError if serde could not be initialized
*/
- private static RegexSerDe createSerde(Table tbl, HiveConf conf, String regex)
- throws SerializationError {
+ @Override
+ public RegexSerDe createSerde() throws SerializationError {
try {
Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex);
- ArrayList<String> tableColumns = getCols(tbl);
- tableProps.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(tableColumns, ","));
+ tableProps.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(inputColumns, ","));
RegexSerDe serde = new RegexSerDe();
SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+ this.serde = serde;
return serde;
} catch (SerDeException e) {
throw new SerializationError("Error initializing serde " + RegexSerDe.class.getName(), e);
}
}
- private static ArrayList<String> getCols(Table table) {
- List<FieldSchema> cols = table.getSd().getCols();
- ArrayList<String> colNames = new ArrayList<String>(cols.size());
- for (FieldSchema col : cols) {
- colNames.add(col.getName().toLowerCase());
- }
- return colNames;
- }
-
/**
* Encode Utf8 encoded string bytes using RegexSerDe
*
- * @param utf8StrRecord
+ * @param utf8StrRecord - serialized record
* @return The encoded object
- * @throws SerializationError
+ * @throws SerializationError - in case of any deserialization error
*/
@Override
public Object encode(byte[] utf8StrRecord) throws SerializationError {
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
deleted file mode 100644
index 2b05771..0000000
--- a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.streaming;
-
-
-import java.util.Collection;
-
-/**
- * Represents a set of Transactions returned by Hive. Supports opening, writing to
- * and commiting/aborting each transaction. The interface is designed to ensure
- * transactions in a batch are used up sequentially. To stream to the same HiveEndPoint
- * concurrently, create separate StreamingConnections.
- *
- * Note on thread safety: At most 2 threads can run through a given TransactionBatch at the same
- * time. One thread may call {@link #heartbeat()} and the other all other methods.
- * Violating this may result in "out of sequence response".
- *
- */
-public interface TransactionBatch {
- enum TxnState {
- INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A");
-
- private final String code;
- TxnState(String code) {
- this.code = code;
- };
- public String toString() {
- return code;
- }
- }
-
- /**
- * Activate the next available transaction in the current transaction batch.
- * @throws StreamingException if not able to switch to next Txn
- * @throws InterruptedException if call in interrupted
- */
- void beginNextTransaction() throws StreamingException, InterruptedException;
-
- /**
- * Get Id of currently open transaction.
- * @return transaction id
- */
- Long getCurrentTxnId();
-
-
- /**
- * Get write Id mapping to currently open transaction.
- * @return write id
- */
- Long getCurrentWriteId();
-
- /**
- * get state of current transaction.
- */
- TxnState getCurrentTransactionState();
-
- /**
- * Commit the currently open transaction.
- * @throws StreamingException if there are errors committing
- * @throws InterruptedException if call in interrupted
- */
- void commit() throws StreamingException, InterruptedException;
-
- /**
- * Abort the currently open transaction.
- * @throws StreamingException if there are errors
- * @throws InterruptedException if call in interrupted
- */
- void abort() throws StreamingException, InterruptedException;
-
- /**
- * Remaining transactions are the ones that are not committed or aborted or open.
- * Current open transaction is not considered part of remaining txns.
- * @return number of transactions remaining this batch.
- */
- int remainingTransactions();
-
-
- /**
- * Write record using RecordWriter.
- * @param record the data to be written
- * @throws StreamingException if there are errors when writing
- * @throws InterruptedException if call in interrupted
- */
- void write(byte[] record) throws StreamingException, InterruptedException;
-
- /**
- * Write records using RecordWriter.
- * @throws StreamingException if there are errors when writing
- * @throws InterruptedException if call in interrupted
- */
- void write(Collection<byte[]> records) throws StreamingException, InterruptedException;
-
-
- /**
- * Issues a heartbeat to hive metastore on the current and remaining txn ids
- * to keep them from expiring.
- * @throws StreamingException if there are errors
- */
- void heartbeat() throws StreamingException;
-
- /**
- * Close the TransactionBatch.
- * @throws StreamingException if there are errors closing batch
- * @throws InterruptedException if call in interrupted
- */
- void close() throws StreamingException, InterruptedException;
- boolean isClosed();
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java b/streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java
deleted file mode 100644
index a8c8cd4..0000000
--- a/streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.streaming;
-
-public class TransactionBatchUnAvailable extends StreamingException {
- public TransactionBatchUnAvailable(HiveEndPoint ep, Exception e) {
- super("Unable to acquire transaction batch on end point: " + ep, e);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/java/org/apache/hive/streaming/TransactionError.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionError.java b/streaming/src/java/org/apache/hive/streaming/TransactionError.java
index a331b20..ae56e7c 100644
--- a/streaming/src/java/org/apache/hive/streaming/TransactionError.java
+++ b/streaming/src/java/org/apache/hive/streaming/TransactionError.java
@@ -19,11 +19,11 @@
package org.apache.hive.streaming;
public class TransactionError extends StreamingException {
- public TransactionError(String msg, Exception e) {
+ TransactionError(String msg, Exception e) {
super(msg + (e == null ? "" : ": " + e.getMessage()), e);
}
- public TransactionError(String msg) {
+ TransactionError(String msg) {
super(msg);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf8d305a/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java b/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java
deleted file mode 100644
index f0843a1..0000000
--- a/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.streaming;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-import junit.framework.Assert;
-
-public class TestDelimitedInputWriter {
- @Test
- public void testFieldReordering() throws Exception {
-
- ArrayList<String> colNames = Lists.newArrayList(new String[]{"col1", "col2", "col3", "col4", "col5"});
- {//1) test dropping fields - first middle & last
- String[] fieldNames = {null, "col2", null, "col4", null};
- int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
- Assert.assertTrue(Arrays.equals(mapping, new int[]{-1, 1, -1, 3, -1}));
- }
-
- {//2) test reordering
- String[] fieldNames = {"col5", "col4", "col3", "col2", "col1"};
- int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
- Assert.assertTrue( Arrays.equals(mapping, new int[]{4,3,2,1,0}) );
- }
-
- {//3) test bad field names
- String[] fieldNames = {"xyz", "abc", "col3", "col4", "as"};
- try {
- DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
- Assert.fail();
- } catch (InvalidColumn e) {
- // should throw
- }
- }
-
- {//4) test few field names
- String[] fieldNames = {"col3", "col4"};
- int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
- Assert.assertTrue( Arrays.equals(mapping, new int[]{2,3}) );
- }
-
- {//5) test extra field names
- String[] fieldNames = {"col5", "col4", "col3", "col2", "col1", "col1"};
- try {
- DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
- Assert.fail();
- } catch (InvalidColumn e) {
- //show throw
- }
- }
- }
-}