You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/09/02 17:59:22 UTC

[nifi] 12/16: NIFI-7740: Add Records Per Transaction and Transactions Per Batch properties to PutHive3Streaming

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.12.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit c10bd4990bfcc5f5fd17c3eefdb03801e7a036a9
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Mon Aug 24 21:45:00 2020 -0400

    NIFI-7740: Add Records Per Transaction and Transactions Per Batch properties to PutHive3Streaming
    
    NIFI-7740: Incorporated review comments
    
    NIFI-7740: Restore RecordsEOFException superclass to SerializationError
    
    This closes #4489.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../apache/hive/streaming/HiveRecordWriter.java    | 19 +++++--
 .../apache/hive/streaming/RecordsEOFException.java | 28 ++++++++++
 .../nifi/processors/hive/PutHive3Streaming.java    | 63 +++++++++++++++++-----
 .../org/apache/nifi/util/hive/HiveOptions.java     | 12 ++++-
 .../processors/hive/TestPutHive3Streaming.java     |  4 +-
 5 files changed, 106 insertions(+), 20 deletions(-)

diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java
index 6edb374..d1b55e9 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java
@@ -34,14 +34,17 @@ import java.util.Properties;
 
 public class HiveRecordWriter extends AbstractRecordWriter {
 
-    private RecordReader recordReader;
+    private final RecordReader recordReader;
     private NiFiRecordSerDe serde;
-    private ComponentLog log;
+    private final ComponentLog log;
+    private final int recordsPerTransaction;
+    private int currentRecordsWritten;
 
-    public HiveRecordWriter(RecordReader recordReader, ComponentLog log) {
+    public HiveRecordWriter(RecordReader recordReader, ComponentLog log, final int recordsPerTransaction) {
         super(null);
         this.recordReader = recordReader;
         this.log = log;
+        this.recordsPerTransaction = recordsPerTransaction;
     }
 
     @Override
@@ -73,10 +76,16 @@ public class HiveRecordWriter extends AbstractRecordWriter {
     public void write(long writeId, InputStream inputStream) throws StreamingException {
         // The inputStream is already available to the recordReader, so just iterate through the records
         try {
-            Record record;
-            while ((record = recordReader.nextRecord()) != null) {
+            Record record = null;
+            while ((++currentRecordsWritten <= recordsPerTransaction || recordsPerTransaction == 0)
+                    && (record = recordReader.nextRecord()) != null) {
                 write(writeId, record);
             }
+            // Once there are no more records, throw a RecordsEOFException to indicate the input stream is exhausted
+            if (record == null) {
+                throw new RecordsEOFException("End of transaction", new Exception());
+            }
+            currentRecordsWritten = 0;
         } catch (MalformedRecordException | IOException e) {
             throw new StreamingException(e.getLocalizedMessage(), e);
         }
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/RecordsEOFException.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/RecordsEOFException.java
new file mode 100644
index 0000000..41e641b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/RecordsEOFException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+/**
+ * This is a "marker class" used by the HiveRecordWriter to indicate there are no more records in the input stream.
+ * It is used by PutHive3Streaming to determine that all records have been written to transaction(s).
+ */
+public class RecordsEOFException extends SerializationError {
+
+    RecordsEOFException(String msg, Exception e) {
+        super(msg, e);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
index 23b873f..0ba6bd2 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
@@ -24,6 +24,7 @@ import org.apache.hive.streaming.ConnectionError;
 import org.apache.hive.streaming.HiveRecordWriter;
 import org.apache.hive.streaming.HiveStreamingConnection;
 import org.apache.hive.streaming.InvalidTable;
+import org.apache.hive.streaming.RecordsEOFException;
 import org.apache.hive.streaming.SerializationError;
 import org.apache.hive.streaming.StreamingConnection;
 import org.apache.hive.streaming.StreamingException;
@@ -171,6 +172,28 @@ public class PutHive3Streaming extends AbstractProcessor {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
+    static final PropertyDescriptor RECORDS_PER_TXN = new PropertyDescriptor.Builder()
+            .name("hive3-stream-records-per-transaction")
+            .displayName("Records per Transaction")
+            .description("Number of records to process before committing the transaction. If set to zero (0), all records will be written in a single transaction.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .defaultValue("0")
+            .build();
+
+    static final PropertyDescriptor TXNS_PER_BATCH = new PropertyDescriptor.Builder()
+            .name("hive3-stream-transactions-per-batch")
+            .displayName("Transactions per Batch")
+            .description("A hint to Hive Streaming indicating how many transactions the processor task will need. The product of Records per Transaction (if not zero) "
+                    + "and Transactions per Batch should be larger than the largest expected number of records in the flow file(s), this will ensure any failed "
+                    + "transaction batches cause a full rollback.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .build();
+
     static final PropertyDescriptor CALL_TIMEOUT = new PropertyDescriptor.Builder()
             .name("hive3-stream-call-timeout")
             .displayName("Call Timeout")
@@ -269,6 +292,8 @@ public class PutHive3Streaming extends AbstractProcessor {
         props.add(DB_NAME);
         props.add(TABLE_NAME);
         props.add(STATIC_PARTITION_VALUES);
+        props.add(RECORDS_PER_TXN);
+        props.add(TXNS_PER_BATCH);
         props.add(CALL_TIMEOUT);
         props.add(DISABLE_STREAMING_OPTIMIZATIONS);
         props.add(ROLLBACK_ON_FAILURE);
@@ -355,10 +380,10 @@ public class PutHive3Streaming extends AbstractProcessor {
 
             if (resolvedKeytab != null) {
                 kerberosUserReference.set(new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab));
-                log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
+                log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
             } else if (explicitPassword != null) {
                 kerberosUserReference.set(new KerberosPasswordUser(resolvedPrincipal, explicitPassword));
-                log.info("Hive Security Enabled, logging in as principal {} with password", new Object[] {resolvedPrincipal});
+                log.info("Hive Security Enabled, logging in as principal {} with password", new Object[]{resolvedPrincipal});
             } else {
                 throw new ProcessException("Unable to authenticate with Kerberos, no keytab or password was provided");
             }
@@ -409,13 +434,17 @@ public class PutHive3Streaming extends AbstractProcessor {
 
             // Override the Hive Metastore URIs in the config if set by the user
             if (metastoreURIs != null) {
-               hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreURIs);
+                hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreURIs);
             }
 
+            final int recordsPerTransaction = context.getProperty(RECORDS_PER_TXN).evaluateAttributeExpressions(flowFile).asInteger();
+            final int transactionsPerBatch = context.getProperty(TXNS_PER_BATCH).evaluateAttributeExpressions(flowFile).asInteger();
+
             HiveOptions o = new HiveOptions(metastoreURIs, dbName, tableName)
                     .withHiveConf(hiveConfig)
                     .withCallTimeout(callTimeout)
-                    .withStreamingOptimizations(!disableStreamingOptimizations);
+                    .withStreamingOptimizations(!disableStreamingOptimizations)
+                    .withTransactionBatchSize(transactionsPerBatch);
 
             if (!StringUtils.isEmpty(staticPartitionValuesString)) {
                 List<String> staticPartitionValues = Arrays.stream(staticPartitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
@@ -444,7 +473,7 @@ public class PutHive3Streaming extends AbstractProcessor {
             try {
                 final RecordReader reader;
 
-                try(final InputStream in = session.read(flowFile)) {
+                try (final InputStream in = session.read(flowFile)) {
                     // if we fail to create the RecordReader then we want to route to failure, so we need to
                     // handle this separately from the other IOExceptions which normally route to retry
                     try {
@@ -453,12 +482,21 @@ public class PutHive3Streaming extends AbstractProcessor {
                         throw new RecordReaderFactoryException("Unable to create RecordReader", e);
                     }
 
-                    hiveStreamingConnection = makeStreamingConnection(options, reader);
+                    hiveStreamingConnection = makeStreamingConnection(options, reader, recordsPerTransaction);
 
                     // Write records to Hive streaming, then commit and close
-                    hiveStreamingConnection.beginTransaction();
-                    hiveStreamingConnection.write(in);
-                    hiveStreamingConnection.commitTransaction();
+                    boolean exitLoop = false;
+                    while (!exitLoop) {
+                        hiveStreamingConnection.beginTransaction();
+                        // The HiveRecordWriter keeps track of records per transaction and will complete writing for the transaction
+                        // once the limit has been reached. It is then reset for the next iteration of the loop.
+                        try {
+                            hiveStreamingConnection.write(in);
+                        } catch (RecordsEOFException reofe) {
+                            exitLoop = true;
+                        }
+                        hiveStreamingConnection.commitTransaction();
+                    }
                     in.close();
 
                     Map<String, String> updateAttributes = new HashMap<>();
@@ -560,13 +598,14 @@ public class PutHive3Streaming extends AbstractProcessor {
         });
     }
 
-    StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException {
+    StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader, int recordsPerTransaction) throws StreamingException {
         return HiveStreamingConnection.newBuilder()
                 .withDatabase(options.getDatabaseName())
                 .withTable(options.getTableName())
                 .withStaticPartitionValues(options.getStaticPartitionValues())
                 .withHiveConf(options.getHiveConf())
-                .withRecordWriter(new HiveRecordWriter(reader, getLogger()))
+                .withRecordWriter(new HiveRecordWriter(reader, getLogger(), recordsPerTransaction))
+                .withTransactionBatchSize(options.getTransactionBatchSize())
                 .withAgentInfo("NiFi " + this.getClass().getSimpleName() + " [" + this.getIdentifier()
                         + "] thread " + Thread.currentThread().getId() + "[" + Thread.currentThread().getName() + "]")
                 .connect();
@@ -642,7 +681,7 @@ public class PutHive3Streaming extends AbstractProcessor {
             KerberosUser kerberosUser = kerberosUserReference.get();
             getLogger().debug("kerberosUser is " + kerberosUser);
             try {
-                getLogger().debug("checking TGT on kerberosUser [{}]", new Object[] {kerberosUser});
+                getLogger().debug("checking TGT on kerberosUser [{}]", new Object[]{kerberosUser});
                 kerberosUser.checkTGTAndRelogin();
             } catch (LoginException e) {
                 throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java
index 82f6856..7efa106 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -36,6 +36,7 @@ public class HiveOptions implements Serializable {
     protected String kerberosKeytab;
     protected HiveConf hiveConf;
     protected boolean streamingOptimizations = true;
+    protected int transactionBatchSize = 1;
 
     public HiveOptions(String metaStoreURI, String databaseName, String tableName) {
         this.metaStoreURI = metaStoreURI;
@@ -73,6 +74,11 @@ public class HiveOptions implements Serializable {
         return this;
     }
 
+    public HiveOptions withTransactionBatchSize(int transactionBatchSize) {
+        this.transactionBatchSize = transactionBatchSize;
+        return this;
+    }
+
     public String getMetaStoreURI() {
         return metaStoreURI;
     }
@@ -108,4 +114,8 @@ public class HiveOptions implements Serializable {
     public boolean getStreamingOptimizations() {
         return streamingOptimizations;
     }
+
+    public int getTransactionBatchSize() {
+        return transactionBatchSize;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
index 2b6487e..05e44fb 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
@@ -1142,7 +1142,7 @@ public class TestPutHive3Streaming {
         }
 
         @Override
-        StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException {
+        StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader, int recordsPerTransaction) throws StreamingException {
 
             // Test here to ensure the 'hive.metastore.uris' property matches the options.getMetastoreUri() value (if it is set)
             String userDefinedMetastoreURI = options.getMetaStoreURI();
@@ -1154,7 +1154,7 @@ public class TestPutHive3Streaming {
                 throw new StubConnectionError("Unit Test - Connection Error");
             }
 
-            HiveRecordWriter hiveRecordWriter = new HiveRecordWriter(reader, getLogger());
+            HiveRecordWriter hiveRecordWriter = new HiveRecordWriter(reader, getLogger(), 0);
             if (generatePermissionsFailure) {
                 throw new StreamingException("Permission denied");
             }