You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/05/05 17:26:15 UTC

nifi git commit: NIFI-3818: PutHiveStreaming throws IllegalStateException

Repository: nifi
Updated Branches:
  refs/heads/master 85405dae1 -> af6f63691


NIFI-3818: PutHiveStreaming throws IllegalStateException

Changed from async append to sync as it breaks 'recursionSet' check in StandardProcessSession by updating it from multiple threads, resulting IllegalStateException to happen.

This closes #1761.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/af6f6369
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/af6f6369
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/af6f6369

Branch: refs/heads/master
Commit: af6f63691cdeee802da7f6d9ddf8b21b2bc40760
Parents: 85405da
Author: Koji Kawamura <ij...@apache.org>
Authored: Fri May 5 18:33:38 2017 +0900
Committer: Bryan Bende <bb...@apache.org>
Committed: Fri May 5 13:25:59 2017 -0400

----------------------------------------------------------------------
 .../nifi/processors/hive/PutHiveStreaming.java  | 120 +++++++------------
 1 file changed, 45 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/af6f6369/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
index 2754f9c..e7d85cd 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
@@ -21,6 +21,7 @@ import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileConstants;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.SeekableByteArrayInput;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
@@ -64,6 +65,7 @@ import org.apache.nifi.util.hive.HiveOptions;
 import org.apache.nifi.util.hive.HiveUtils;
 import org.apache.nifi.util.hive.HiveWriter;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -75,8 +77,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -85,7 +85,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
-import java.util.function.Function;
 import java.util.regex.Pattern;
 
 /**
@@ -383,16 +382,13 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
         private AtomicReference<FlowFile> failureFlowFile;
         private final DataFileWriter<GenericRecord> successAvroWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
         private final DataFileWriter<GenericRecord> failureAvroWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
+        private byte[] successAvroHeader;
+        private byte[] failureAvroHeader;
 
         private final AtomicInteger recordCount = new AtomicInteger(0);
         private final AtomicInteger successfulRecordCount = new AtomicInteger(0);
         private final AtomicInteger failedRecordCount = new AtomicInteger(0);
 
-        private volatile ExecutorService appendRecordThreadPool;
-        private volatile AtomicBoolean closed = new AtomicBoolean(false);
-        private final BlockingQueue<List<HiveStreamingRecord>> successRecordQueue = new ArrayBlockingQueue<>(100);
-        private final BlockingQueue<List<HiveStreamingRecord>> failureRecordQueue = new ArrayBlockingQueue<>(100);
-
         private final ComponentLog logger;
 
         /**
@@ -412,9 +408,8 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
             this.failureFlowFile = new AtomicReference<>(failureFlowFile);
         }
 
-        private void initAvroWriter(ProcessSession session, String codec, DataFileStream<GenericRecord> reader,
-                                    DataFileWriter<GenericRecord> writer, AtomicReference<FlowFile> flowFileRef,
-                                    BlockingQueue<List<HiveStreamingRecord>> queue, Function<Integer, Boolean> isCompleted) {
+        private byte[] initAvroWriter(ProcessSession session, String codec, DataFileStream<GenericRecord> reader,
+                                             DataFileWriter<GenericRecord> writer, AtomicReference<FlowFile> flowFileRef) {
 
             writer.setCodec(CodecFactory.fromString(codec));
             // Transfer metadata (this is a subset of the incoming file)
@@ -424,71 +419,59 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
                 }
             }
 
-            appendRecordThreadPool.submit(() -> {
-                flowFileRef.set(session.append(flowFileRef.get(), (out) -> {
-                    // Create writer so that records can be appended.
-                    writer.create(reader.getSchema(), out);
-
-                    try {
-                        int writtenCount = 0;
-                        while (true) {
-
-                            if (closed.get() && isCompleted.apply(writtenCount)) {
-                                break;
-                            }
+            final ByteArrayOutputStream avroHeader = new ByteArrayOutputStream();
+            flowFileRef.set(session.append(flowFileRef.get(), (out) -> {
+                // Create writer so that records can be appended later.
+                writer.create(reader.getSchema(), avroHeader);
+                writer.close();
 
-                            final List<HiveStreamingRecord> hRecords = queue.poll(100, TimeUnit.MILLISECONDS);
-                            if (hRecords != null) {
-                                try {
-                                    for (HiveStreamingRecord hRecord : hRecords) {
-                                        writer.append(hRecord.getRecord());
-                                        writtenCount++;
-                                    }
-                                } catch (IOException ioe) {
-                                    // The records were put to Hive Streaming successfully, but there was an error while writing the
-                                    // Avro records to the flow file. Log as an error and move on.
-                                    logger.error("Error writing Avro records (which were sent successfully to Hive Streaming) to the flow file, " + ioe, ioe);
-                                }
-                            }
-                        }
-                        writer.flush();
-                    } catch (InterruptedException e) {
-                        logger.warn("Append record thread is interrupted, " + e, e);
-                    }
+                final byte[] header = avroHeader.toByteArray();
+                out.write(header);
+            }));
 
-                }));
-            });
+            // Capture the Avro header byte array that is just written to the FlowFile.
+            // This is needed when Avro records are appended to the same FlowFile.
+            return avroHeader.toByteArray();
         }
 
         private void initAvroWriters(ProcessSession session, String codec, DataFileStream<GenericRecord> reader) {
-            appendRecordThreadPool = Executors.newFixedThreadPool(2);
-            initAvroWriter(session, codec, reader, successAvroWriter, successFlowFile, successRecordQueue, w -> w == successfulRecordCount.get());
-            initAvroWriter(session, codec, reader, failureAvroWriter, failureFlowFile, failureRecordQueue, w -> w == failedRecordCount.get());
+            successAvroHeader = initAvroWriter(session, codec, reader, successAvroWriter, successFlowFile);
+            failureAvroHeader = initAvroWriter(session, codec, reader, failureAvroWriter, failureFlowFile);
+        }
+
+        private void appendAvroRecords(ProcessSession session, byte[] avroHeader, DataFileWriter<GenericRecord> writer,
+                                       AtomicReference<FlowFile> flowFileRef, List<HiveStreamingRecord> hRecords) {
 
-            // No new task.
-            appendRecordThreadPool.shutdown();
+            flowFileRef.set(session.append(flowFileRef.get(), (out) -> {
+                if (hRecords != null) {
+                    // Initialize the writer again as append mode, so that Avro header is written only once.
+                    writer.appendTo(new SeekableByteArrayInput(avroHeader), out);
+                    try {
+                        for (HiveStreamingRecord hRecord : hRecords) {
+                            writer.append(hRecord.getRecord());
+                        }
+                    } catch (IOException ioe) {
+                        // The records were put to Hive Streaming successfully, but there was an error while writing the
+                        // Avro records to the flow file. Log as an error and move on.
+                        logger.error("Error writing Avro records (which were sent successfully to Hive Streaming) to the flow file, " + ioe, ioe);
+                    }
+                }
+                writer.close();
+            }));
         }
 
-        private void appendRecordsToSuccess(List<HiveStreamingRecord> records) {
-            appendRecordsToFlowFile(records, successRecordQueue);
+        private void appendRecordsToSuccess(ProcessSession session, List<HiveStreamingRecord> records) {
+            appendAvroRecords(session, successAvroHeader, successAvroWriter, successFlowFile, records);
             successfulRecordCount.addAndGet(records.size());
         }
 
-        private void appendRecordsToFailure(List<HiveStreamingRecord> records) {
-            appendRecordsToFlowFile(records, failureRecordQueue);
+        private void appendRecordsToFailure(ProcessSession session, List<HiveStreamingRecord> records) {
+            appendAvroRecords(session, failureAvroHeader, failureAvroWriter, failureFlowFile, records);
             failedRecordCount.addAndGet(records.size());
         }
 
-        private void appendRecordsToFlowFile(List<HiveStreamingRecord> records, BlockingQueue<List<HiveStreamingRecord>> queue) {
-            if (!queue.add(records)) {
-                throw new ProcessException(String.format("Failed to append %d records due to insufficient internal queue capacity.", records.size()));
-            }
-        }
-
         private void transferFlowFiles(ProcessSession session, RoutingResult result, String transitUri) {
 
-            closeAvroWriters();
-
             if (successfulRecordCount.get() > 0) {
                 // Transfer the flow file with successful records
                 successFlowFile.set(
@@ -513,19 +496,6 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
             });
         }
 
-        private void closeAvroWriters() {
-            closed.set(true);
-            if (appendRecordThreadPool != null) {
-                // Having null thread pool means the input FlowFile was not processed at all, due to illegal format.
-                try {
-                    if (!appendRecordThreadPool.awaitTermination(10, TimeUnit.SECONDS)) {
-                        logger.warn("Waiting for Avro records being appended into output FlowFiles has been timeout.");
-                    }
-                } catch (InterruptedException e) {
-                    logger.warn("Waiting for Avro records being appended into output FlowFiles has been interrupted.");
-                }
-            }
-        }
     }
 
     private static class ShouldRetryException extends RuntimeException {
@@ -545,7 +515,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
                 case Failure:
                     // Add the failed record to the failure flow file
                     getLogger().error(String.format("Error writing %s to Hive Streaming transaction due to %s", input, e), e);
-                    fc.appendRecordsToFailure(input);
+                    fc.appendRecordsToFailure(session, input);
                     break;
 
                 case Retry:
@@ -670,7 +640,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
 
                     Runnable flushSuccessfulRecords = () -> {
                         // Now send the records to the successful FlowFile and update the success count
-                        functionContext.appendRecordsToSuccess(successfulRecords.get());
+                        functionContext.appendRecordsToSuccess(session, successfulRecords.get());
                         // Clear the list of successful records, we'll use it at the end when we flush whatever records are left
                         successfulRecords.set(new ArrayList<>());
                     };