You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/05/05 17:26:15 UTC
nifi git commit: NIFI-3818: PutHiveStreaming throws
IllegalStateException
Repository: nifi
Updated Branches:
refs/heads/master 85405dae1 -> af6f63691
NIFI-3818: PutHiveStreaming throws IllegalStateException
Changed from async append to sync as it breaks 'recursionSet' check in StandardProcessSession by updating it from multiple threads, resulting IllegalStateException to happen.
This closes #1761.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/af6f6369
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/af6f6369
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/af6f6369
Branch: refs/heads/master
Commit: af6f63691cdeee802da7f6d9ddf8b21b2bc40760
Parents: 85405da
Author: Koji Kawamura <ij...@apache.org>
Authored: Fri May 5 18:33:38 2017 +0900
Committer: Bryan Bende <bb...@apache.org>
Committed: Fri May 5 13:25:59 2017 -0400
----------------------------------------------------------------------
.../nifi/processors/hive/PutHiveStreaming.java | 120 +++++++------------
1 file changed, 45 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/af6f6369/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
index 2754f9c..e7d85cd 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
@@ -21,6 +21,7 @@ import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
@@ -64,6 +65,7 @@ import org.apache.nifi.util.hive.HiveOptions;
import org.apache.nifi.util.hive.HiveUtils;
import org.apache.nifi.util.hive.HiveWriter;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -75,8 +77,6 @@ import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -85,7 +85,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
-import java.util.function.Function;
import java.util.regex.Pattern;
/**
@@ -383,16 +382,13 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
private AtomicReference<FlowFile> failureFlowFile;
private final DataFileWriter<GenericRecord> successAvroWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
private final DataFileWriter<GenericRecord> failureAvroWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
+ private byte[] successAvroHeader;
+ private byte[] failureAvroHeader;
private final AtomicInteger recordCount = new AtomicInteger(0);
private final AtomicInteger successfulRecordCount = new AtomicInteger(0);
private final AtomicInteger failedRecordCount = new AtomicInteger(0);
- private volatile ExecutorService appendRecordThreadPool;
- private volatile AtomicBoolean closed = new AtomicBoolean(false);
- private final BlockingQueue<List<HiveStreamingRecord>> successRecordQueue = new ArrayBlockingQueue<>(100);
- private final BlockingQueue<List<HiveStreamingRecord>> failureRecordQueue = new ArrayBlockingQueue<>(100);
-
private final ComponentLog logger;
/**
@@ -412,9 +408,8 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
this.failureFlowFile = new AtomicReference<>(failureFlowFile);
}
- private void initAvroWriter(ProcessSession session, String codec, DataFileStream<GenericRecord> reader,
- DataFileWriter<GenericRecord> writer, AtomicReference<FlowFile> flowFileRef,
- BlockingQueue<List<HiveStreamingRecord>> queue, Function<Integer, Boolean> isCompleted) {
+ private byte[] initAvroWriter(ProcessSession session, String codec, DataFileStream<GenericRecord> reader,
+ DataFileWriter<GenericRecord> writer, AtomicReference<FlowFile> flowFileRef) {
writer.setCodec(CodecFactory.fromString(codec));
// Transfer metadata (this is a subset of the incoming file)
@@ -424,71 +419,59 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
}
}
- appendRecordThreadPool.submit(() -> {
- flowFileRef.set(session.append(flowFileRef.get(), (out) -> {
- // Create writer so that records can be appended.
- writer.create(reader.getSchema(), out);
-
- try {
- int writtenCount = 0;
- while (true) {
-
- if (closed.get() && isCompleted.apply(writtenCount)) {
- break;
- }
+ final ByteArrayOutputStream avroHeader = new ByteArrayOutputStream();
+ flowFileRef.set(session.append(flowFileRef.get(), (out) -> {
+ // Create writer so that records can be appended later.
+ writer.create(reader.getSchema(), avroHeader);
+ writer.close();
- final List<HiveStreamingRecord> hRecords = queue.poll(100, TimeUnit.MILLISECONDS);
- if (hRecords != null) {
- try {
- for (HiveStreamingRecord hRecord : hRecords) {
- writer.append(hRecord.getRecord());
- writtenCount++;
- }
- } catch (IOException ioe) {
- // The records were put to Hive Streaming successfully, but there was an error while writing the
- // Avro records to the flow file. Log as an error and move on.
- logger.error("Error writing Avro records (which were sent successfully to Hive Streaming) to the flow file, " + ioe, ioe);
- }
- }
- }
- writer.flush();
- } catch (InterruptedException e) {
- logger.warn("Append record thread is interrupted, " + e, e);
- }
+ final byte[] header = avroHeader.toByteArray();
+ out.write(header);
+ }));
- }));
- });
+ // Capture the Avro header byte array that is just written to the FlowFile.
+ // This is needed when Avro records are appended to the same FlowFile.
+ return avroHeader.toByteArray();
}
private void initAvroWriters(ProcessSession session, String codec, DataFileStream<GenericRecord> reader) {
- appendRecordThreadPool = Executors.newFixedThreadPool(2);
- initAvroWriter(session, codec, reader, successAvroWriter, successFlowFile, successRecordQueue, w -> w == successfulRecordCount.get());
- initAvroWriter(session, codec, reader, failureAvroWriter, failureFlowFile, failureRecordQueue, w -> w == failedRecordCount.get());
+ successAvroHeader = initAvroWriter(session, codec, reader, successAvroWriter, successFlowFile);
+ failureAvroHeader = initAvroWriter(session, codec, reader, failureAvroWriter, failureFlowFile);
+ }
+
+ private void appendAvroRecords(ProcessSession session, byte[] avroHeader, DataFileWriter<GenericRecord> writer,
+ AtomicReference<FlowFile> flowFileRef, List<HiveStreamingRecord> hRecords) {
- // No new task.
- appendRecordThreadPool.shutdown();
+ flowFileRef.set(session.append(flowFileRef.get(), (out) -> {
+ if (hRecords != null) {
+ // Initialize the writer again as append mode, so that Avro header is written only once.
+ writer.appendTo(new SeekableByteArrayInput(avroHeader), out);
+ try {
+ for (HiveStreamingRecord hRecord : hRecords) {
+ writer.append(hRecord.getRecord());
+ }
+ } catch (IOException ioe) {
+ // The records were put to Hive Streaming successfully, but there was an error while writing the
+ // Avro records to the flow file. Log as an error and move on.
+ logger.error("Error writing Avro records (which were sent successfully to Hive Streaming) to the flow file, " + ioe, ioe);
+ }
+ }
+ writer.close();
+ }));
}
- private void appendRecordsToSuccess(List<HiveStreamingRecord> records) {
- appendRecordsToFlowFile(records, successRecordQueue);
+ private void appendRecordsToSuccess(ProcessSession session, List<HiveStreamingRecord> records) {
+ appendAvroRecords(session, successAvroHeader, successAvroWriter, successFlowFile, records);
successfulRecordCount.addAndGet(records.size());
}
- private void appendRecordsToFailure(List<HiveStreamingRecord> records) {
- appendRecordsToFlowFile(records, failureRecordQueue);
+ private void appendRecordsToFailure(ProcessSession session, List<HiveStreamingRecord> records) {
+ appendAvroRecords(session, failureAvroHeader, failureAvroWriter, failureFlowFile, records);
failedRecordCount.addAndGet(records.size());
}
- private void appendRecordsToFlowFile(List<HiveStreamingRecord> records, BlockingQueue<List<HiveStreamingRecord>> queue) {
- if (!queue.add(records)) {
- throw new ProcessException(String.format("Failed to append %d records due to insufficient internal queue capacity.", records.size()));
- }
- }
-
private void transferFlowFiles(ProcessSession session, RoutingResult result, String transitUri) {
- closeAvroWriters();
-
if (successfulRecordCount.get() > 0) {
// Transfer the flow file with successful records
successFlowFile.set(
@@ -513,19 +496,6 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
});
}
- private void closeAvroWriters() {
- closed.set(true);
- if (appendRecordThreadPool != null) {
- // Having null thread pool means the input FlowFile was not processed at all, due to illegal format.
- try {
- if (!appendRecordThreadPool.awaitTermination(10, TimeUnit.SECONDS)) {
- logger.warn("Waiting for Avro records being appended into output FlowFiles has been timeout.");
- }
- } catch (InterruptedException e) {
- logger.warn("Waiting for Avro records being appended into output FlowFiles has been interrupted.");
- }
- }
- }
}
private static class ShouldRetryException extends RuntimeException {
@@ -545,7 +515,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
case Failure:
// Add the failed record to the failure flow file
getLogger().error(String.format("Error writing %s to Hive Streaming transaction due to %s", input, e), e);
- fc.appendRecordsToFailure(input);
+ fc.appendRecordsToFailure(session, input);
break;
case Retry:
@@ -670,7 +640,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
Runnable flushSuccessfulRecords = () -> {
// Now send the records to the successful FlowFile and update the success count
- functionContext.appendRecordsToSuccess(successfulRecords.get());
+ functionContext.appendRecordsToSuccess(session, successfulRecords.get());
// Clear the list of successful records, we'll use it at the end when we flush whatever records are left
successfulRecords.set(new ArrayList<>());
};