You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/09/02 17:59:22 UTC
[nifi] 12/16: NIFI-7740: Add Records Per Transaction and
Transactions Per Batch properties to PutHive3Streaming
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch support/nifi-1.12.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit c10bd4990bfcc5f5fd17c3eefdb03801e7a036a9
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Mon Aug 24 21:45:00 2020 -0400
NIFI-7740: Add Records Per Transaction and Transactions Per Batch properties to PutHive3Streaming
NIFI-7740: Incorporated review comments
NIFI-7740: Restore RecordsEOFException superclass to SerializationError
This closes #4489.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../apache/hive/streaming/HiveRecordWriter.java | 19 +++++--
.../apache/hive/streaming/RecordsEOFException.java | 28 ++++++++++
.../nifi/processors/hive/PutHive3Streaming.java | 63 +++++++++++++++++-----
.../org/apache/nifi/util/hive/HiveOptions.java | 12 ++++-
.../processors/hive/TestPutHive3Streaming.java | 4 +-
5 files changed, 106 insertions(+), 20 deletions(-)
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java
index 6edb374..d1b55e9 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java
@@ -34,14 +34,17 @@ import java.util.Properties;
public class HiveRecordWriter extends AbstractRecordWriter {
- private RecordReader recordReader;
+ private final RecordReader recordReader;
private NiFiRecordSerDe serde;
- private ComponentLog log;
+ private final ComponentLog log;
+ private final int recordsPerTransaction;
+ private int currentRecordsWritten;
- public HiveRecordWriter(RecordReader recordReader, ComponentLog log) {
+ public HiveRecordWriter(RecordReader recordReader, ComponentLog log, final int recordsPerTransaction) {
super(null);
this.recordReader = recordReader;
this.log = log;
+ this.recordsPerTransaction = recordsPerTransaction;
}
@Override
@@ -73,10 +76,16 @@ public class HiveRecordWriter extends AbstractRecordWriter {
public void write(long writeId, InputStream inputStream) throws StreamingException {
// The inputStream is already available to the recordReader, so just iterate through the records
try {
- Record record;
- while ((record = recordReader.nextRecord()) != null) {
+ Record record = null;
+ while ((++currentRecordsWritten <= recordsPerTransaction || recordsPerTransaction == 0)
+ && (record = recordReader.nextRecord()) != null) {
write(writeId, record);
}
+ // Once there are no more records, throw a RecordsEOFException to indicate the input stream is exhausted
+ if (record == null) {
+ throw new RecordsEOFException("End of transaction", new Exception());
+ }
+ currentRecordsWritten = 0;
} catch (MalformedRecordException | IOException e) {
throw new StreamingException(e.getLocalizedMessage(), e);
}
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/RecordsEOFException.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/RecordsEOFException.java
new file mode 100644
index 0000000..41e641b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/RecordsEOFException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+/**
+ * This is a "marker class" used by the HiveRecordWriter to indicate there are no more records in the input stream.
+ * It is used by PutHive3Streaming to determine that all records have been written to transaction(s).
+ */
+public class RecordsEOFException extends SerializationError {
+
+ RecordsEOFException(String msg, Exception e) {
+ super(msg, e);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
index 23b873f..0ba6bd2 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
@@ -24,6 +24,7 @@ import org.apache.hive.streaming.ConnectionError;
import org.apache.hive.streaming.HiveRecordWriter;
import org.apache.hive.streaming.HiveStreamingConnection;
import org.apache.hive.streaming.InvalidTable;
+import org.apache.hive.streaming.RecordsEOFException;
import org.apache.hive.streaming.SerializationError;
import org.apache.hive.streaming.StreamingConnection;
import org.apache.hive.streaming.StreamingException;
@@ -171,6 +172,28 @@ public class PutHive3Streaming extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ static final PropertyDescriptor RECORDS_PER_TXN = new PropertyDescriptor.Builder()
+ .name("hive3-stream-records-per-transaction")
+ .displayName("Records per Transaction")
+ .description("Number of records to process before committing the transaction. If set to zero (0), all records will be written in a single transaction.")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .defaultValue("0")
+ .build();
+
+ static final PropertyDescriptor TXNS_PER_BATCH = new PropertyDescriptor.Builder()
+ .name("hive3-stream-transactions-per-batch")
+ .displayName("Transactions per Batch")
+ .description("A hint to Hive Streaming indicating how many transactions the processor task will need. The product of Records per Transaction (if not zero) "
+ + "and Transactions per Batch should be larger than the largest expected number of records in the flow file(s), this will ensure any failed "
+ + "transaction batches cause a full rollback.")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("1")
+ .build();
+
static final PropertyDescriptor CALL_TIMEOUT = new PropertyDescriptor.Builder()
.name("hive3-stream-call-timeout")
.displayName("Call Timeout")
@@ -269,6 +292,8 @@ public class PutHive3Streaming extends AbstractProcessor {
props.add(DB_NAME);
props.add(TABLE_NAME);
props.add(STATIC_PARTITION_VALUES);
+ props.add(RECORDS_PER_TXN);
+ props.add(TXNS_PER_BATCH);
props.add(CALL_TIMEOUT);
props.add(DISABLE_STREAMING_OPTIMIZATIONS);
props.add(ROLLBACK_ON_FAILURE);
@@ -355,10 +380,10 @@ public class PutHive3Streaming extends AbstractProcessor {
if (resolvedKeytab != null) {
kerberosUserReference.set(new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab));
- log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
+ log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
} else if (explicitPassword != null) {
kerberosUserReference.set(new KerberosPasswordUser(resolvedPrincipal, explicitPassword));
- log.info("Hive Security Enabled, logging in as principal {} with password", new Object[] {resolvedPrincipal});
+ log.info("Hive Security Enabled, logging in as principal {} with password", new Object[]{resolvedPrincipal});
} else {
throw new ProcessException("Unable to authenticate with Kerberos, no keytab or password was provided");
}
@@ -409,13 +434,17 @@ public class PutHive3Streaming extends AbstractProcessor {
// Override the Hive Metastore URIs in the config if set by the user
if (metastoreURIs != null) {
- hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreURIs);
+ hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreURIs);
}
+ final int recordsPerTransaction = context.getProperty(RECORDS_PER_TXN).evaluateAttributeExpressions(flowFile).asInteger();
+ final int transactionsPerBatch = context.getProperty(TXNS_PER_BATCH).evaluateAttributeExpressions(flowFile).asInteger();
+
HiveOptions o = new HiveOptions(metastoreURIs, dbName, tableName)
.withHiveConf(hiveConfig)
.withCallTimeout(callTimeout)
- .withStreamingOptimizations(!disableStreamingOptimizations);
+ .withStreamingOptimizations(!disableStreamingOptimizations)
+ .withTransactionBatchSize(transactionsPerBatch);
if (!StringUtils.isEmpty(staticPartitionValuesString)) {
List<String> staticPartitionValues = Arrays.stream(staticPartitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
@@ -444,7 +473,7 @@ public class PutHive3Streaming extends AbstractProcessor {
try {
final RecordReader reader;
- try(final InputStream in = session.read(flowFile)) {
+ try (final InputStream in = session.read(flowFile)) {
// if we fail to create the RecordReader then we want to route to failure, so we need to
// handle this separately from the other IOExceptions which normally route to retry
try {
@@ -453,12 +482,21 @@ public class PutHive3Streaming extends AbstractProcessor {
throw new RecordReaderFactoryException("Unable to create RecordReader", e);
}
- hiveStreamingConnection = makeStreamingConnection(options, reader);
+ hiveStreamingConnection = makeStreamingConnection(options, reader, recordsPerTransaction);
// Write records to Hive streaming, then commit and close
- hiveStreamingConnection.beginTransaction();
- hiveStreamingConnection.write(in);
- hiveStreamingConnection.commitTransaction();
+ boolean exitLoop = false;
+ while (!exitLoop) {
+ hiveStreamingConnection.beginTransaction();
+ // The HiveRecordWriter keeps track of records per transaction and will complete writing for the transaction
+ // once the limit has been reached. It is then reset for the next iteration of the loop.
+ try {
+ hiveStreamingConnection.write(in);
+ } catch (RecordsEOFException reofe) {
+ exitLoop = true;
+ }
+ hiveStreamingConnection.commitTransaction();
+ }
in.close();
Map<String, String> updateAttributes = new HashMap<>();
@@ -560,13 +598,14 @@ public class PutHive3Streaming extends AbstractProcessor {
});
}
- StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException {
+ StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader, int recordsPerTransaction) throws StreamingException {
return HiveStreamingConnection.newBuilder()
.withDatabase(options.getDatabaseName())
.withTable(options.getTableName())
.withStaticPartitionValues(options.getStaticPartitionValues())
.withHiveConf(options.getHiveConf())
- .withRecordWriter(new HiveRecordWriter(reader, getLogger()))
+ .withRecordWriter(new HiveRecordWriter(reader, getLogger(), recordsPerTransaction))
+ .withTransactionBatchSize(options.getTransactionBatchSize())
.withAgentInfo("NiFi " + this.getClass().getSimpleName() + " [" + this.getIdentifier()
+ "] thread " + Thread.currentThread().getId() + "[" + Thread.currentThread().getName() + "]")
.connect();
@@ -642,7 +681,7 @@ public class PutHive3Streaming extends AbstractProcessor {
KerberosUser kerberosUser = kerberosUserReference.get();
getLogger().debug("kerberosUser is " + kerberosUser);
try {
- getLogger().debug("checking TGT on kerberosUser [{}]", new Object[] {kerberosUser});
+ getLogger().debug("checking TGT on kerberosUser [{}]", new Object[]{kerberosUser});
kerberosUser.checkTGTAndRelogin();
} catch (LoginException e) {
throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java
index 82f6856..7efa106 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -36,6 +36,7 @@ public class HiveOptions implements Serializable {
protected String kerberosKeytab;
protected HiveConf hiveConf;
protected boolean streamingOptimizations = true;
+ protected int transactionBatchSize = 1;
public HiveOptions(String metaStoreURI, String databaseName, String tableName) {
this.metaStoreURI = metaStoreURI;
@@ -73,6 +74,11 @@ public class HiveOptions implements Serializable {
return this;
}
+ public HiveOptions withTransactionBatchSize(int transactionBatchSize) {
+ this.transactionBatchSize = transactionBatchSize;
+ return this;
+ }
+
public String getMetaStoreURI() {
return metaStoreURI;
}
@@ -108,4 +114,8 @@ public class HiveOptions implements Serializable {
public boolean getStreamingOptimizations() {
return streamingOptimizations;
}
+
+ public int getTransactionBatchSize() {
+ return transactionBatchSize;
+ }
}
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
index 2b6487e..05e44fb 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
@@ -1142,7 +1142,7 @@ public class TestPutHive3Streaming {
}
@Override
- StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException {
+ StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader, int recordsPerTransaction) throws StreamingException {
// Test here to ensure the 'hive.metastore.uris' property matches the options.getMetastoreUri() value (if it is set)
String userDefinedMetastoreURI = options.getMetaStoreURI();
@@ -1154,7 +1154,7 @@ public class TestPutHive3Streaming {
throw new StubConnectionError("Unit Test - Connection Error");
}
- HiveRecordWriter hiveRecordWriter = new HiveRecordWriter(reader, getLogger());
+ HiveRecordWriter hiveRecordWriter = new HiveRecordWriter(reader, getLogger(), 0);
if (generatePermissionsFailure) {
throw new StreamingException("Permission denied");
}