You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/11/11 22:05:57 UTC

[1/4] nifi git commit: NIFI-994: Initial import of TailFile

Repository: nifi
Updated Branches:
  refs/heads/master 854d20398 -> 7a165b62c


NIFI-994: Initial import of TailFile


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/31f0909b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/31f0909b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/31f0909b

Branch: refs/heads/master
Commit: 31f0909bd315af43936b844327454ba2c48611e4
Parents: 49ee06b
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Oct 18 15:16:56 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Oct 18 19:00:24 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/TailFile.java      | 661 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../nifi/processors/standard/TestTailFile.java  | 205 ++++++
 .../src/test/resources/logback-test.xml         |  18 +
 4 files changed, 885 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/31f0909b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
new file mode 100644
index 0000000..2fe9431
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.LongHolder;
+
+// note: it is important that this Processor is not marked as @SupportsBatching because the session commits must complete before persisting state locally; otherwise, data loss may occur
+@TriggerSerially
+@Tags({"tail", "file", "log", "text", "source"})
+@CapabilityDescription("\"Tails\" a file, ingesting data from the file as it is written to the file. The file is expected to be textual. Data is ingested only when a "
+    + "new line is encountered (carriage return or new-line character or combination). If the file to tail is periodically \"rolled over\", as is generally the case "
+    + "with log files, an optional Rolling Filename Pattern can be used to retrieve data from files that have rolled over, even if the rollover occurred while NiFi "
+    + "was not running (provided that the data still exists upon restart of NiFi).")
+public class TailFile extends AbstractProcessor {
+
+    static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
+        .name("File to Tail")
+        .description("Fully-qualified filename of the file that should be tailed")
+        .expressionLanguageSupported(false)
+        .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+        .required(true)
+        .build();
+    static final PropertyDescriptor ROLLING_FILENAME_PATTERN = new PropertyDescriptor.Builder()
+        .name("Rolling Filename Pattern")
+        .description("If the file to tail \"rolls over\" as would be the case with log files, this filename pattern will be used to "
+            + "identify files that have rolled over so that if NiFi is restarted, and the file has rolled over, it will be able to pick up where it left off. "
+            + "This pattern supports wildcard characters * and ? and will assume that the files that have rolled over live in the same directory as the file being tailed.")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .required(false)
+        .build();
+    static final PropertyDescriptor STATE_FILE = new PropertyDescriptor.Builder()
+        .name("State File")
+        .description("Specifies the file that should be used for storing state about what data has been ingested so that upon restart NiFi can resume from where it left off")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .required(true)
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("All FlowFiles are routed to this Relationship.")
+        .build();
+
+    private volatile TailFileState state = new TailFileState(null, null, null, 0L, 0L, null, new byte[65536]);
+    private volatile boolean recoveredRolledFiles = false;
+    private volatile Long expectedRecoveryChecksum;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(FILENAME);
+        properties.add(ROLLING_FILENAME_PATTERN);
+        properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(STATE_FILE).defaultValue("./conf/state/" + getIdentifier()).build());
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Collections.singleton(REL_SUCCESS);
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+        if (FILENAME.equals(descriptor)) {
+            state = new TailFileState(newValue, null, null, 0L, 0L, null, new byte[65536]);
+            recoveredRolledFiles = false;
+        } else if (ROLLING_FILENAME_PATTERN.equals(descriptor)) {
+            recoveredRolledFiles = false;
+        }
+    }
+
+    @OnScheduled
+    public void recoverState(final ProcessContext context) throws IOException {
+        recoveredRolledFiles = false;
+
+        final String tailFilename = context.getProperty(FILENAME).getValue();
+        final String stateFilename = context.getProperty(STATE_FILE).getValue();
+
+        final File stateFile = new File(stateFilename);
+        try (final FileInputStream fis = new FileInputStream(stateFile);
+            final DataInputStream dis = new DataInputStream(fis)) {
+
+            final int encodingVersion = dis.readInt();
+            if (encodingVersion > 0) {
+                throw new IOException("Unable to recover state because State File was encoded in a more recent version than Version 1");
+            }
+
+            if (encodingVersion == 0) {
+                final String filename = dis.readUTF();
+                final long position = dis.readLong();
+                final long timestamp = dis.readLong();
+                final boolean checksumPresent = dis.readBoolean();
+
+                RandomAccessFile reader = null;
+                File tailFile = null;
+
+                if (checksumPresent && tailFilename.equals(filename)) {
+                    expectedRecoveryChecksum = dis.readLong();
+
+                    // We have an expected checksum and the currently configured filename is the same as the state file.
+                    // We need to check if the existing file is the same as the one referred to in the state file based on
+                    // the checksum.
+                    final File existingTailFile = new File(filename);
+                    if (existingTailFile.length() >= position) {
+                        try (final InputStream tailFileIs = new FileInputStream(existingTailFile);
+                            final CheckedInputStream in = new CheckedInputStream(tailFileIs, new CRC32())) {
+                            StreamUtils.copy(in, new NullOutputStream(), state.getPosition());
+
+                            final long checksumResult = in.getChecksum().getValue();
+                            if (checksumResult == expectedRecoveryChecksum) {
+                                tailFile = existingTailFile;
+                                reader = new RandomAccessFile(tailFile, "r");
+                                reader.seek(position);
+                            }
+                        }
+                    }
+
+                    state = new TailFileState(tailFilename, tailFile, reader, position, timestamp, null, new byte[65536]);
+                } else {
+                    expectedRecoveryChecksum = null;
+                    // tailing a new file since the state file was written out. We will reset state.
+                    state = new TailFileState(tailFilename, null, null, 0L, 0L, null, new byte[65536]);
+                }
+            } else {
+                // encoding Version == -1... no data in file. Just move on.
+            }
+        } catch (final FileNotFoundException fnfe) {
+        }
+    }
+
+
+    public void persistState(final TailFileState state, final String stateFilename) throws IOException {
+        final File stateFile = new File(stateFilename);
+        File directory = stateFile.getParentFile();
+        if (directory != null && !directory.exists() && !directory.mkdirs()) {
+            getLogger().warn("Failed to persist state to {} because the parent directory does not exist and could not be created. This may result in data being duplicated upon restart of NiFi");
+            return;
+        }
+        try (final FileOutputStream fos = new FileOutputStream(stateFile);
+            final DataOutputStream dos = new DataOutputStream(fos)) {
+
+            dos.writeInt(0); // version
+            dos.writeUTF(state.getFilename());
+            dos.writeLong(state.getPosition());
+            dos.writeLong(state.getTimestamp());
+            if (state.getChecksum() == null) {
+                dos.writeBoolean(false);
+            } else {
+                dos.writeBoolean(true);
+                dos.writeLong(state.getChecksum().getValue());
+            }
+        }
+    }
+
+    private RandomAccessFile createReader(final File file, final long position) {
+        final RandomAccessFile reader;
+
+        try {
+            reader = new RandomAccessFile(file, "r");
+        } catch (final FileNotFoundException fnfe) {
+            getLogger().debug("File {} does not exist; yielding and returning", new Object[] {file});
+            return null;
+        }
+
+        try {
+            reader.seek(position);
+        } catch (final IOException ioe) {
+            getLogger().error("Failed to read from {} due to {}", new Object[] {file, ioe});
+
+            try {
+                reader.close();
+            } catch (final IOException ioe2) {
+            }
+
+            return null;
+        }
+
+        return reader;
+    }
+
+    // for testing purposes
+    TailFileState getState() {
+        return state;
+    }
+
+
+
+    /**
+     * Finds any files that have rolled over and have not yet been ingested by this Processor. Each of these files that is found will be
+     * ingested as its own FlowFile. If a file is found that has been partially ingested, the rest of the file will be ingested as a
+     * single FlowFile but the data that already has been ingested will not be ingested again.
+     *
+     * @param context the ProcessContext to use in order to obtain Processor configuration
+     * @param session the ProcessSession to use in order to interact with FlowFile creation and content.
+     */
+    private void recoverRolledFiles(final ProcessContext context, final ProcessSession session) {
+        try {
+            // Find all files that match our rollover pattern, if any, and order them based on their timestamp and filename.
+            // Ignore any file that has a timestamp earlier than the state that we have persisted. If we were reading from
+            // a file when we stopped running, then that file that we were reading from should be the first file in this list,
+            // assuming that the file still exists on the file system.
+            final List<File> rolledOffFiles = getRolledOffFiles(context, state.getTimestamp());
+
+            // For first file that we find, it may or may not be the file that we were last reading from.
+            // As a result, we have to read up to the position we stored, while calculating the checksum. If the checksums match,
+            // then we know we've already processed this file. If the checksums do not match, then we have not
+            // processed this file and we need to seek back to position 0 and ingest the entire file.
+            // For all other files that have been rolled over, we need to just ingest the entire file.
+            if (!rolledOffFiles.isEmpty() && expectedRecoveryChecksum != null && rolledOffFiles.get(0).length() >= state.getPosition()) {
+                final File firstFile = rolledOffFiles.get(0);
+
+                final long startNanos = System.nanoTime();
+                try (final InputStream fis = new FileInputStream(firstFile);
+                    final CheckedInputStream in = new CheckedInputStream(fis, new CRC32())) {
+                    StreamUtils.copy(in, new NullOutputStream(), state.getPosition());
+
+                    final long checksumResult = in.getChecksum().getValue();
+                    if (checksumResult == expectedRecoveryChecksum) {
+                        // This is the same file that we were reading when we shutdown. Start reading from this point on.
+                        rolledOffFiles.remove(0);
+                        FlowFile flowFile = session.create();
+                        flowFile = session.importFrom(in, flowFile);
+                        flowFile = session.putAttribute(flowFile, "filename", firstFile.getName());
+
+                        session.getProvenanceReporter().receive(flowFile, firstFile.toURI().toString(), "FlowFile contains bytes 0 through " + state.getPosition() + " of source file",
+                            TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
+                        session.transfer(flowFile, REL_SUCCESS);
+
+                        // use a timestamp of lastModified() + 1 so that we do not ingest this file again.
+                        state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, state.getBuffer());
+
+                        // must ensure that we do session.commit() before persisting state in order to avoid data loss.
+                        session.commit();
+                        persistState(state, context.getProperty(STATE_FILE).getValue());
+                    }
+                }
+            }
+
+            // For each file that we found that matches our Rollover Pattern, and has a last modified date later than the timestamp
+            // that we recovered from the state file, we need to consume the entire file. The only exception to this is the file that
+            // we were reading when we last stopped, as it may already have been partially consumed. That is taken care of in the
+            // above block of code.
+            for (final File file : rolledOffFiles) {
+                FlowFile flowFile = session.create();
+                flowFile = session.importFrom(file.toPath(), true, flowFile);
+                flowFile = session.putAttribute(flowFile, "filename", file.getName());
+                session.getProvenanceReporter().receive(flowFile, file.toURI().toString());
+                session.transfer(flowFile, REL_SUCCESS);
+
+                // use a timestamp of lastModified() + 1 so that we do not ingest this file again.
+                state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, file.lastModified() + 1L, null, state.getBuffer());
+
+                // must ensure that we do session.commit() before persisting state in order to avoid data loss.
+                session.commit();
+                persistState(state, context.getProperty(STATE_FILE).getValue());
+            }
+        } catch (final IOException e) {
+            getLogger().error("Failed to recover files that have rolled over due to {}", new Object[] {e});
+        }
+    }
+
+
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        // If this is the first time the processor has run since it was started, we need to check for any files that may have rolled over
+        // while the processor was stopped. If we find any, we need to import them into the flow.
+        if (!recoveredRolledFiles) {
+            recoverRolledFiles(context, session);
+            recoveredRolledFiles = true;
+        }
+
+        // initialize local variables from state object; this is done so that we can easily change the values throughout
+        // the onTrigger method and then create a new state object after we finish processing the files.
+        TailFileState state = this.state;
+        File file = state.getFile();
+        RandomAccessFile reader = state.getReader();
+        Checksum checksum = state.getChecksum();
+        if (checksum == null) {
+            checksum = new CRC32();
+        }
+        long position = state.getPosition();
+        long timestamp = state.getTimestamp();
+
+        // Create a reader if necessary.
+        if (file == null || reader == null) {
+            file = new File(context.getProperty(FILENAME).getValue());
+            reader = createReader(file, position);
+            if (reader == null) {
+                context.yield();
+                return;
+            }
+        }
+
+        final long startNanos = System.nanoTime();
+
+        // Check if file has rotated
+        long fileLength = file.length();
+        if (fileLength < position) {
+            // File has rotated. It's possible that it rotated before we finished reading all of the data. As a result, we need
+            // to check the last rolled-over file and see if it is longer than our position. If so, consume the data past our
+            // marked position.
+            try {
+                final List<File> updatedRolledOverFiles = getRolledOffFiles(context, timestamp);
+                if (!updatedRolledOverFiles.isEmpty()) {
+                    final File lastRolledOver = updatedRolledOverFiles.get(updatedRolledOverFiles.size() - 1);
+
+                    // there is more data in the file that has not yet been consumed.
+                    if (lastRolledOver.length() > state.getPosition()) {
+                        try (final FileInputStream fis = new FileInputStream(lastRolledOver)) {
+                            StreamUtils.skip(fis, state.getPosition());
+
+                            FlowFile flowFile = session.create();
+                            flowFile = session.importFrom(fis, flowFile);
+                            flowFile = session.putAttribute(flowFile, "filename", lastRolledOver.getName());
+
+                            session.getProvenanceReporter().receive(flowFile, lastRolledOver.toURI().toString(), "FlowFile contains bytes " + state.getPosition() + " through " +
+                                lastRolledOver.length() + " of source file", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
+                            session.transfer(flowFile, REL_SUCCESS);
+                            this.state = state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, lastRolledOver.lastModified() + 1L, null, state.getBuffer());
+
+                            // must ensure that we do session.commit() before persisting state in order to avoid data loss.
+                            session.commit();
+                            persistState(state, context.getProperty(STATE_FILE).getValue());
+                        }
+                    }
+                }
+            } catch (final IOException ioe) {
+                getLogger().error("File being tailed was rolled over. However, was unable to determine which \"Rollover Files\" exist or read the last one due to {}. "
+                    + "It is possible that data at the end of the last file will be skipped as a result.", new Object[] {ioe});
+            }
+
+
+            // Since file has rotated, we close the reader, create a new one, and then reset our state.
+            try {
+                reader.close();
+            } catch (final IOException ioe) {
+                getLogger().warn("Failed to close reader for {} due to {}", new Object[] {file, ioe});
+            }
+
+            reader = createReader(file, 0L);
+            position = 0L;
+            checksum.reset();
+            fileLength = file.length();
+        }
+
+        // check if there is any data to consume by checking if file has grown or last modified timestamp has changed.
+        boolean consumeData = false;
+        if (fileLength > position) {
+            consumeData = true;
+        } else if (file.lastModified() > timestamp) {
+            // This can happen if file is truncated, or is replaced with the same amount of data as the old file.
+            position = 0;
+
+            try {
+                reader.seek(0L);
+            } catch (final IOException ioe) {
+                getLogger().error("Failed to seek to beginning of file due to {}", new Object[] {ioe});
+                context.yield();
+                return;
+            }
+
+            consumeData = true;
+        }
+
+        // If there is data to consume, read as much as we can.
+        final TailFileState currentState = state;
+        final Checksum chksum = checksum;
+        if (consumeData) {
+            // data has been written to file. Stream it to a new FlowFile.
+            FlowFile flowFile = session.create();
+            final RandomAccessFile fileReader = reader;
+            final LongHolder positionHolder = new LongHolder(position);
+            flowFile = session.write(flowFile, new OutputStreamCallback() {
+                @Override
+                public void process(final OutputStream rawOut) throws IOException {
+                    try (final OutputStream out = new BufferedOutputStream(rawOut)) {
+                        positionHolder.set(readLines(fileReader, currentState.getBuffer(), out, chksum));
+                    }
+                }
+            });
+
+            // If there ended up being no data, just remove the FlowFile
+            if (flowFile.getSize() == 0) {
+                session.remove(flowFile);
+            } else {
+                // determine filename for FlowFile by using <base filename of log file>.<initial offset>-<final offset>.<extension>
+                final String tailFilename = file.getName();
+                final String baseName = StringUtils.substringBeforeLast(tailFilename, ".");
+                final String flowFileName;
+                if (baseName.length() < tailFilename.length()) {
+                    flowFileName = baseName + "." + position + "-" + positionHolder.get() + "." + StringUtils.substringAfterLast(tailFilename, ".");
+                } else {
+                    flowFileName = baseName + "." + position + "-" + positionHolder.get();
+                }
+
+                final Map<String, String> attributes = new HashMap<>(2);
+                attributes.put(CoreAttributes.FILENAME.key(), flowFileName);
+                attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+                flowFile = session.putAllAttributes(flowFile, attributes);
+
+                session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), "FlowFile contains bytes " + position + " through " + positionHolder.get() + " of source file",
+                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
+                session.transfer(flowFile, REL_SUCCESS);
+                position = positionHolder.get();
+            }
+        }
+
+        // Create a new state object to represent our current position, timestamp, etc.
+        timestamp = System.currentTimeMillis();
+        final TailFileState updatedState = new TailFileState(context.getProperty(FILENAME).getValue(), file, reader, position, timestamp, checksum, state.getBuffer());
+        this.state = updatedState;
+
+        if (!consumeData) {
+            // no data to consume so rather than continually running, yield to allow other processors to use the thread.
+            // In this case, the state should not have changed, and we will have created no FlowFiles, so we don't have to
+            // persist the state or commit the session; instead, just return here.
+            context.yield();
+            return;
+        }
+
+        // We must commit session before persisting state in order to avoid data loss on restart
+        session.commit();
+        final String stateFilename = context.getProperty(STATE_FILE).getValue();
+        try {
+            persistState(updatedState, stateFilename);
+        } catch (final IOException e) {
+            getLogger().warn("Failed to update state file {} due to {}; some data may be duplicated on restart of NiFi", new Object[] {stateFilename, e});
+        }
+    }
+
+
+    /**
+     * Read new lines from the given RandomAccessFile, copying it to the given Output Stream. The Checksum is used in order to later determine whether or not
+     * data has been consumed.
+     *
+     * @param reader The RandomAccessFile to read data from
+     * @param buffer the buffer to use for copying data
+     * @param out the OutputStream to copy the data to
+     * @param checksum the Checksum object to use in order to calculate checksum for recovery purposes
+     *
+     * @return The new position after the lines have been read
+     * @throws java.io.IOException if an I/O error occurs.
+     */
+    private long readLines(final RandomAccessFile reader, final byte[] buffer, final OutputStream out, final Checksum checksum) throws IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        long pos = reader.getFilePointer();
+        long rePos = pos; // position to re-read
+
+        int num;
+        boolean seenCR = false;
+        while (((num = reader.read(buffer)) != -1)) {
+            for (int i = 0; i < num; i++) {
+                byte ch = buffer[i];
+
+                switch (ch) {
+                    case '\n':
+                        baos.write(ch);
+                        seenCR = false;
+                        baos.writeTo(out);
+                        baos.reset();
+                        rePos = pos + i + 1;
+                        break;
+                    case '\r':
+                        baos.write(ch);
+                        seenCR = true;
+                        break;
+                    default:
+                        if (seenCR) {
+                            seenCR = false;
+                            baos.writeTo(out);
+                            baos.reset();
+                            baos.write(ch);
+                            rePos = pos + i;
+                        } else {
+                            baos.write(ch);
+                        }
+                }
+            }
+
+            checksum.update(buffer, 0, num);
+            pos = reader.getFilePointer();
+        }
+
+        reader.seek(rePos); // Ensure we can re-read if necessary
+        return rePos;
+    }
+
+
+    /**
+     * Returns a list of all Files that match the following criteria:
+     *
+     * <ul>
+     * <li>Filename matches the Rolling Filename Pattern</li>
+     * <li>Filename does not match the actual file being tailed</li>
+     * <li>The Last Modified Time on the file is equal to or later than the given minimum timestamp</li>
+     * </ul>
+     *
+     * <p>
+     * The List that is returned will be ordered by file timestamp, providing the oldest file first.
+     * </p>
+     *
+     * @param context the ProcessContext to use in order to determine Processor configuration
+     * @param minTimestamp any file with a Last Modified Time before this timestamp will not be returned
+     * @return a list of all Files that have rolled over
+     * @throws IOException if unable to perform the listing of files
+     */
+    private List<File> getRolledOffFiles(final ProcessContext context, final long minTimestamp) throws IOException {
+        final String tailFilename = context.getProperty(FILENAME).getValue();
+        final File tailFile = new File(tailFilename);
+        File directory = tailFile.getParentFile();
+        if (directory == null) {
+            directory = new File(".");
+        }
+
+        final String rollingPattern = context.getProperty(ROLLING_FILENAME_PATTERN).getValue();
+        if (rollingPattern == null) {
+            return Collections.emptyList();
+        }
+
+        final List<File> rolledOffFiles = new ArrayList<>();
+        final DirectoryStream<Path> dirStream = Files.newDirectoryStream(directory.toPath(), rollingPattern);
+        for (final Path path : dirStream) {
+            final File file = path.toFile();
+            if (file.lastModified() < minTimestamp || file.equals(tailFile)) {
+                continue;
+            }
+
+            rolledOffFiles.add(file);
+        }
+
+        // Sort files based on last modified timestamp. If same timestamp, use filename as a secondary sort, as often
+        // files that are rolled over are given a naming scheme that is lexicographically sort in the same order as the
+        // timestamp, such as yyyy-MM-dd-HH-mm-ss
+        Collections.sort(rolledOffFiles, new Comparator<File>() {
+            @Override
+            public int compare(final File o1, final File o2) {
+                final int lastModifiedComp = Long.compare(o1.lastModified(), o2.lastModified());
+                if (lastModifiedComp != 0) {
+                    return lastModifiedComp;
+                }
+
+                return o1.getName().compareTo(o2.getName());
+            }
+        });
+
+        return rolledOffFiles;
+    }
+
+
+    /**
+     * A simple Java class to hold information about our state so that we can maintain this state across multiple invocations of the Processor
+     */
+    static class TailFileState {
+        private final String filename; // hold onto filename and not just File because we want to match that against the user-defined filename to recover from
+        private final File file;
+        private final RandomAccessFile raf;
+        private final long position;
+        private final long timestamp;
+        private final Checksum checksum;
+        private final byte[] buffer;
+
+        public TailFileState(final String filename, final File file, final RandomAccessFile raf, final long position, final long timestamp, final Checksum checksum, final byte[] buffer) {
+            this.filename = filename;
+            this.file = file;
+            this.raf = raf;
+            this.position = position;
+            this.timestamp = (timestamp / 1000) * 1000; // many operating systems will use only second-level precision for last-modified times so cut off milliseconds
+            this.checksum = checksum;
+            this.buffer = buffer;
+        }
+
+        public String getFilename() {
+            return filename;
+        }
+
+        public File getFile() {
+            return file;
+        }
+
+        public RandomAccessFile getReader() {
+            return raf;
+        }
+
+        public long getPosition() {
+            return position;
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+
+        public Checksum getChecksum() {
+            return checksum;
+        }
+
+        public byte[] getBuffer() {
+            return buffer;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/31f0909b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 0ce1456..5bd36db 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -65,6 +65,7 @@ org.apache.nifi.processors.standard.SplitContent
 org.apache.nifi.processors.standard.SplitJson
 org.apache.nifi.processors.standard.SplitText
 org.apache.nifi.processors.standard.SplitXml
+org.apache.nifi.processors.standard.TailFile
 org.apache.nifi.processors.standard.TransformXml
 org.apache.nifi.processors.standard.UnpackContent
 org.apache.nifi.processors.standard.ValidateXml

http://git-wip-us.apache.org/repos/asf/nifi/blob/31f0909b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
new file mode 100644
index 0000000..d282a9d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.nifi.processors.standard.TailFile.TailFileState;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestTailFile {
+    private File file;
+    private RandomAccessFile raf;
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws IOException {
+        final File targetDir = new File("target");
+        final File[] files = targetDir.listFiles(new FilenameFilter() {
+            @Override
+            public boolean accept(final File dir, final String name) {
+                return name.startsWith("log");
+            }
+        });
+
+        for (final File file : files) {
+            file.delete();
+        }
+
+        file = new File("target/log.txt");
+        file.delete();
+        assertTrue(file.createNewFile());
+
+        final File stateFile = new File("target/tail-file.state");
+        stateFile.delete();
+        Assert.assertFalse(stateFile.exists());
+
+        runner = TestRunners.newTestRunner(new TailFile());
+        runner.setProperty(TailFile.FILENAME, "target/log.txt");
+        runner.setProperty(TailFile.STATE_FILE, "target/tail-file.state");
+        runner.assertValid();
+
+        raf = new RandomAccessFile(file, "rw");
+    }
+
+    @After
+    public void cleanup() throws IOException {
+        if (raf != null) {
+            raf.close();
+        }
+    }
+
+
+    @Test
+    public void testConsumeAfterTruncation() throws IOException {
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        raf.write("hello\n".getBytes());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
+        runner.clearTransferState();
+
+        // truncate and then write same number of bytes
+        raf.setLength(0L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+        raf.write("HELLO\n".getBytes());
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("HELLO\n");
+    }
+
+
+    @Test
+    public void testRemainderOfFileRecoveredAfterRestart() throws IOException {
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log*.txt");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        raf.write("hello\n".getBytes());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
+        runner.clearTransferState();
+
+        raf.write("world".getBytes());
+        raf.close();
+        file.renameTo(new File("target/log1.txt"));
+
+        raf = new RandomAccessFile(new File("target/log.txt"), "rw");
+        raf.write("new file\n".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("new file\n");
+    }
+
+
+    @Test
+    public void testRemainderOfFileRecoveredIfRolledOverWhileRunning() throws IOException {
+        // this mimics the case when we are reading a log file that rolls over while processor is running.
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log*.txt");
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        raf.write("hello\n".getBytes());
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
+        runner.clearTransferState();
+
+        raf.write("world".getBytes());
+        raf.close();
+        file.renameTo(new File("target/log1.txt"));
+
+        raf = new RandomAccessFile(new File("target/log.txt"), "rw");
+        raf.write("1\n".getBytes());
+        runner.run(1, false, false);
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("1\n");
+    }
+
+
+    @Test
+    public void testConsumeWhenNewLineFound() throws IOException, InterruptedException {
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        final long start = System.currentTimeMillis();
+        Thread.sleep(1100L);
+
+        raf.write("Hello, World".getBytes());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        raf.write("\r\n".getBytes());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+
+        final TailFileState state = ((TailFile) runner.getProcessor()).getState();
+        assertNotNull(state);
+        assertEquals("target/log.txt", state.getFilename());
+        assertTrue(state.getTimestamp() <= System.currentTimeMillis());
+        assertTrue(state.getTimestamp() >= start);
+        assertEquals(14, state.getPosition());
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("Hello, World\r\n");
+
+        runner.clearTransferState();
+
+        raf.write("12345".getBytes());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        raf.write("\n".getBytes());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("12345\n");
+
+        runner.clearTransferState();
+        raf.write("carriage\rreturn\r".getBytes());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("carriage\r");
+
+        runner.clearTransferState();
+        raf.write("\r\n".getBytes());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("return\r\r\n");
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/31f0909b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/logback-test.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/logback-test.xml
index 139b232..fad019a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/logback-test.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/logback-test.xml
@@ -22,6 +22,21 @@
         </encoder>
     </appender>
     
+    <appender name="TARGET_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>target/log.txt</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <fileNamePattern>target/log_%d{yyyy-MM-dd_HH}.%i.txt</fileNamePattern>
+            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+                <maxFileSize>10KB</maxFileSize>
+            </timeBasedFileNamingAndTriggeringPolicy>
+            <!-- keep 30 log files worth of history -->
+            <maxHistory>30</maxHistory>
+        </rollingPolicy>
+        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+            <pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
+            <immediateFlush>true</immediateFlush>
+        </encoder>
+    </appender>    
     <!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR -->
     <logger name="org.apache.nifi" level="INFO"/>
     
@@ -41,6 +56,9 @@
     <logger name="com.sun.jersey.spi.container.servlet.WebComponent" level="ERROR"/>
 
     <logger name="org.apache.nifi.processors.standard" level="DEBUG"/>
+    <logger name="target.file" level="DEBUG" additivity="true">
+        <appender-ref ref="TARGET_FILE" />
+    </logger>
 
     <root level="INFO">
         <appender-ref ref="CONSOLE"/>


[3/4] nifi git commit: NIFI-994: Fixed issue that could result in data duplication if more than 1 rollover of tailed file has occurred on restart of Processor

Posted by ma...@apache.org.
NIFI-994: Fixed issue that could result in data duplication if more than 1 rollover of tailed file has occurred on restart of Processor


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7b9c8df6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7b9c8df6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7b9c8df6

Branch: refs/heads/master
Commit: 7b9c8df6c593059d063770095ab9efcf3c82467e
Parents: bfa9e45
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Nov 10 16:45:46 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Nov 10 16:53:29 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/TailFile.java      | 127 +++++++++++++------
 .../nifi/processors/standard/TestTailFile.java  |  85 +++++++++++++
 2 files changed, 174 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7b9c8df6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index dfe489e..4d7e6f5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -18,7 +18,6 @@
 package org.apache.nifi.processors.standard;
 
 import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
@@ -60,6 +59,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.apache.nifi.stream.io.NullOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.LongHolder;
@@ -179,27 +179,38 @@ public class TailFile extends AbstractProcessor {
                     // We have an expected checksum and the currently configured filename is the same as the state file.
                     // We need to check if the existing file is the same as the one referred to in the state file based on
                     // the checksum.
+                    final Checksum checksum = new CRC32();
                     final File existingTailFile = new File(filename);
                     if (existingTailFile.length() >= position) {
                         try (final InputStream tailFileIs = new FileInputStream(existingTailFile);
-                            final CheckedInputStream in = new CheckedInputStream(tailFileIs, new CRC32())) {
+                            final CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum)) {
                             StreamUtils.copy(in, new NullOutputStream(), state.getPosition());
 
                             final long checksumResult = in.getChecksum().getValue();
                             if (checksumResult == expectedRecoveryChecksum) {
+                                // Checksums match. This means that we want to resume reading from where we left off.
+                                // So we will populate the reader object so that it will be used in onTrigger. If the
+                                // checksums do not match, then we will leave the reader object null, so that the next
+                                // call to onTrigger will result in a new Reader being created and starting at the
+                                // beginning of the file.
+                                getLogger().debug("When recovering state, checksum of tailed file matches the stored checksum. Will resume where left off.");
                                 tailFile = existingTailFile;
                                 reader = new RandomAccessFile(tailFile, "r");
                                 reader.seek(position);
+                            } else {
+                                getLogger().debug("When recovering state, checksum of tailed file does not match the stored checksum. Will begin tailing current file from beginning.");
                             }
                         }
                     }
 
-                    state = new TailFileState(tailFilename, tailFile, reader, position, timestamp, null, new byte[65536]);
+                    state = new TailFileState(tailFilename, tailFile, reader, position, timestamp, checksum, new byte[65536]);
                 } else {
                     expectedRecoveryChecksum = null;
                     // tailing a new file since the state file was written out. We will reset state.
                     state = new TailFileState(tailFilename, null, null, 0L, 0L, null, new byte[65536]);
                 }
+
+                getLogger().debug("Recovered state {}", new Object[] {state});
             } else {
                 // encoding Version == -1... no data in file. Just move on.
             }
@@ -209,6 +220,8 @@ public class TailFile extends AbstractProcessor {
 
 
     public void persistState(final TailFileState state, final String stateFilename) throws IOException {
+        getLogger().debug("Persisting state {} to {}", new Object[] {state, stateFilename});
+
         final File stateFile = new File(stateFilename);
         File directory = stateFile.getParentFile();
         if (directory != null && !directory.exists() && !directory.mkdirs()) {
@@ -279,6 +292,7 @@ public class TailFile extends AbstractProcessor {
             // a file when we stopped running, then that file that we were reading from should be the first file in this list,
             // assuming that the file still exists on the file system.
             final List<File> rolledOffFiles = getRolledOffFiles(context, state.getTimestamp());
+            getLogger().debug("Recovering Rolled Off Files; total number of files rolled off = {}", new Object[] {rolledOffFiles.size()});
 
             // For first file that we find, it may or may not be the file that we were last reading from.
             // As a result, we have to read up to the position we stored, while calculating the checksum. If the checksums match,
@@ -295,6 +309,8 @@ public class TailFile extends AbstractProcessor {
 
                     final long checksumResult = in.getChecksum().getValue();
                     if (checksumResult == expectedRecoveryChecksum) {
+                        getLogger().debug("Checksum for {} matched expected checksum. Will skip first {} bytes", new Object[] {firstFile, state.getPosition()});
+
                         // This is the same file that we were reading when we shutdown. Start reading from this point on.
                         rolledOffFiles.remove(0);
                         FlowFile flowFile = session.create();
@@ -317,6 +333,9 @@ public class TailFile extends AbstractProcessor {
                             session.commit();
                             persistState(state, context.getProperty(STATE_FILE).getValue());
                         }
+                    } else {
+                        getLogger().debug("Checksum for {} did not match expected checksum. Checksum for file was {} but expected {}. Will consume entire file",
+                            new Object[] {firstFile, checksumResult, expectedRecoveryChecksum});
                     }
                 }
             }
@@ -425,11 +444,15 @@ public class TailFile extends AbstractProcessor {
             // marked position.
             try {
                 final List<File> updatedRolledOverFiles = getRolledOffFiles(context, timestamp);
+                getLogger().debug("Tailed file has rotated. Total number of rolled off files to check for un-consumed modifications: {}", new Object[] {updatedRolledOverFiles.size()});
+
                 if (!updatedRolledOverFiles.isEmpty()) {
                     final File lastRolledOver = updatedRolledOverFiles.get(updatedRolledOverFiles.size() - 1);
 
                     // there is more data in the file that has not yet been consumed.
                     if (lastRolledOver.length() > state.getPosition()) {
+                        getLogger().debug("Last rolled over file {} is larger than our last position; will consume data from it after offset {}", new Object[] {lastRolledOver, state.getPosition()});
+
                         try (final FileInputStream fis = new FileInputStream(lastRolledOver)) {
                             StreamUtils.skip(fis, state.getPosition());
 
@@ -450,6 +473,8 @@ public class TailFile extends AbstractProcessor {
                                 persistState(state, context.getProperty(STATE_FILE).getValue());
                             }
                         }
+                    } else {
+                        getLogger().debug("Last rolled over file {} is not larger than our last position; will not consume data from it", new Object[] {lastRolledOver});
                     }
                 }
             } catch (final IOException ioe) {
@@ -531,11 +556,11 @@ public class TailFile extends AbstractProcessor {
                     TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
                 session.transfer(flowFile, REL_SUCCESS);
                 position = positionHolder.get();
+                timestamp = System.currentTimeMillis();
             }
         }
 
         // Create a new state object to represent our current position, timestamp, etc.
-        timestamp = System.currentTimeMillis();
         final TailFileState updatedState = new TailFileState(context.getProperty(FILENAME).getValue(), file, reader, position, timestamp, checksum, state.getBuffer());
         this.state = updatedState;
 
@@ -571,47 +596,61 @@ public class TailFile extends AbstractProcessor {
      * @throws java.io.IOException if an I/O error occurs.
      */
     private long readLines(final RandomAccessFile reader, final byte[] buffer, final OutputStream out, final Checksum checksum) throws IOException {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        long pos = reader.getFilePointer();
-        long rePos = pos; // position to re-read
-
-        int num;
-        boolean seenCR = false;
-        while (((num = reader.read(buffer)) != -1)) {
-            for (int i = 0; i < num; i++) {
-                byte ch = buffer[i];
-
-                switch (ch) {
-                    case '\n':
-                        baos.write(ch);
-                        seenCR = false;
-                        baos.writeTo(out);
-                        baos.reset();
-                        rePos = pos + i + 1;
-                        break;
-                    case '\r':
-                        baos.write(ch);
-                        seenCR = true;
-                        break;
-                    default:
-                        if (seenCR) {
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            long pos = reader.getFilePointer();
+            long rePos = pos; // position to re-read
+
+            int num;
+            int linesRead = 0;
+            boolean seenCR = false;
+            while (((num = reader.read(buffer)) != -1)) {
+                for (int i = 0; i < num; i++) {
+                    byte ch = buffer[i];
+
+                    switch (ch) {
+                        case '\n':
+                            baos.write(ch);
                             seenCR = false;
                             baos.writeTo(out);
+                            checksum.update(baos.getUnderlyingBuffer(), 0, baos.size());
+                            if (getLogger().isTraceEnabled()) {
+                                getLogger().trace("Checksum updated to {}", new Object[] {checksum.getValue()});
+                            }
+
                             baos.reset();
+                            rePos = pos + i + 1;
+                            linesRead++;
+                            break;
+                        case '\r':
                             baos.write(ch);
-                            rePos = pos + i;
-                        } else {
-                            baos.write(ch);
-                        }
+                            seenCR = true;
+                            break;
+                        default:
+                            if (seenCR) {
+                                seenCR = false;
+                                baos.writeTo(out);
+                                checksum.update(baos.getUnderlyingBuffer(), 0, baos.size());
+                                if (getLogger().isTraceEnabled()) {
+                                    getLogger().trace("Checksum updated to {}", new Object[] {checksum.getValue()});
+                                }
+
+                                linesRead++;
+                                baos.reset();
+                                baos.write(ch);
+                                rePos = pos + i;
+                            } else {
+                                baos.write(ch);
+                            }
+                    }
                 }
+
+                pos = reader.getFilePointer();
             }
 
-            checksum.update(buffer, 0, num);
-            pos = reader.getFilePointer();
+            getLogger().debug("Read {} lines; repositioning reader from {} to {}", new Object[] {linesRead, pos, rePos});
+            reader.seek(rePos); // Ensure we can re-read if necessary
+            return rePos;
         }
-
-        reader.seek(rePos); // Ensure we can re-read if necessary
-        return rePos;
     }
 
 
@@ -650,7 +689,14 @@ public class TailFile extends AbstractProcessor {
         final DirectoryStream<Path> dirStream = Files.newDirectoryStream(directory.toPath(), rollingPattern);
         for (final Path path : dirStream) {
             final File file = path.toFile();
-            if (file.lastModified() < minTimestamp || file.equals(tailFile)) {
+            final long lastMod = file.lastModified();
+
+            if (file.lastModified() < minTimestamp) {
+                getLogger().debug("Found rolled off file {} but its last modified timestamp is before the cutoff (Last Mod = {}, Cutoff = {}) so will not consume it",
+                    new Object[] {file, lastMod, minTimestamp});
+
+                continue;
+            } else if (file.equals(tailFile)) {
                 continue;
             }
 
@@ -725,5 +771,10 @@ public class TailFile extends AbstractProcessor {
         public byte[] getBuffer() {
             return buffer;
         }
+
+        @Override
+        public String toString() {
+            return "TailFileState[filename=" + filename + ", position=" + position + ", timestamp=" + timestamp + ", checksum=" + (checksum == null ? "null" : checksum.getValue()) + "]";
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7b9c8df6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index 993f2b1..85638ad 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -42,6 +42,8 @@ public class TestTailFile {
 
     @Before
     public void setup() throws IOException {
+        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard", "TRACE");
+
         final File targetDir = new File("target");
         final File[] files = targetDir.listFiles(new FilenameFilter() {
             @Override
@@ -240,6 +242,89 @@ public class TestTailFile {
         runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("1\n");
     }
 
+    @Test
+    public void testRolloverAfterHavingReadAllData() throws IOException, InterruptedException {
+        // If we have read all data in a file, and that file does not end with a new-line, then the last line
+        // in the file will have been read, added to the checksum, and then we would re-seek to "unread" that
+        // last line since it didn't have a new-line. We need to ensure that if the data is then rolled over
+        // that our checksum does not take into account those bytes that have been "unread."
+
+        // this mimics the case when we are reading a log file that rolls over while processor is running.
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        raf.write("hello\n".getBytes());
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
+        runner.clearTransferState();
+
+        raf.write("world".getBytes());
+
+        Thread.sleep(1000L);
+
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); // should not pull in data because no \n
+
+        raf.close();
+        file.renameTo(new File("target/log.1"));
+
+        raf = new RandomAccessFile(new File("target/log.txt"), "rw");
+        raf.write("1\n".getBytes());
+        runner.run(1, false, false);
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("1\n");
+    }
+
+
+    @Test
+    public void testMultipleRolloversAfterHavingReadAllData() throws IOException, InterruptedException {
+        // this mimics the case when we are reading a log file that rolls over while processor is running.
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        raf.write("hello\n".getBytes());
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
+        runner.clearTransferState();
+
+        raf.write("world".getBytes());
+        runner.run(1); // ensure that we've read 'world' but not consumed it into a flowfile.
+
+        Thread.sleep(1000L);
+
+        // rename file to log.2
+        raf.close();
+        file.renameTo(new File("target/log.2"));
+
+        // write to a new file.
+        file = new File("target/log.txt");
+        raf = new RandomAccessFile(file, "rw");
+        raf.write("abc\n".getBytes());
+
+        // rename file to log.1
+        raf.close();
+        file.renameTo(new File("target/log.1"));
+
+        // write to a new file.
+        file = new File("target/log.txt");
+        raf = new RandomAccessFile(file, "rw");
+        raf.write("1\n".getBytes());
+        raf.close();
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 3);
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("abc\n");
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("1\n");
+    }
+
 
     @Test
     public void testConsumeWhenNewLineFound() throws IOException, InterruptedException {


[2/4] nifi git commit: NIFI-994: Ensure that processor is not valid due to the tail file not yet existing

Posted by ma...@apache.org.
NIFI-994: Ensure that processor is not valid due to the tail file not yet existing


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bfa9e450
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bfa9e450
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bfa9e450

Branch: refs/heads/master
Commit: bfa9e450798591db11a7b520cd01388f8819d865
Parents: 31f0909
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Oct 28 13:44:28 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Oct 30 11:44:51 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/TailFile.java      | 130 ++++++++++++++-----
 .../nifi/processors/standard/TestTailFile.java  |  90 ++++++++++++-
 2 files changed, 188 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/bfa9e450/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index 2fe9431..dfe489e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -49,6 +49,7 @@ import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -72,11 +73,18 @@ import org.apache.nifi.util.LongHolder;
     + "was not running (provided that the data still exists upon restart of NiFi).")
 public class TailFile extends AbstractProcessor {
 
+    static final AllowableValue START_BEGINNING_OF_TIME = new AllowableValue("Beginning of Time", "Beginning of Time",
+        "Start with the oldest data that matches the Rolling Filename Pattern and then begin reading from the File to Tail");
+    static final AllowableValue START_CURRENT_FILE = new AllowableValue("Beginning of File", "Beginning of File",
+        "Start with the beginning of the File to Tail. Do not ingest any data that has already been rolled over");
+    static final AllowableValue START_CURRENT_TIME = new AllowableValue("Current Time", "Current Time",
+        "Start with the data at the end of the File to Tail. Do not ingest any data thas has already been rolled over or any data in the File to Tail that has already been written.");
+
     static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
         .name("File to Tail")
         .description("Fully-qualified filename of the file that should be tailed")
         .expressionLanguageSupported(false)
-        .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .required(true)
         .build();
     static final PropertyDescriptor ROLLING_FILENAME_PATTERN = new PropertyDescriptor.Builder()
@@ -95,6 +103,14 @@ public class TailFile extends AbstractProcessor {
         .expressionLanguageSupported(false)
         .required(true)
         .build();
+    static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder()
+        .name("Initial Start Position")
+        .description("When the Processor first begins to tail data, this property specifies where the Processor should begin reading data. Once data has been ingested from the file, "
+            + "the Processor will continue from the last point from which it has received data.")
+        .allowableValues(START_BEGINNING_OF_TIME, START_CURRENT_FILE, START_CURRENT_TIME)
+        .defaultValue(START_CURRENT_FILE.getValue())
+        .required(true)
+        .build();
 
     static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
@@ -104,6 +120,7 @@ public class TailFile extends AbstractProcessor {
     private volatile TailFileState state = new TailFileState(null, null, null, 0L, 0L, null, new byte[65536]);
     private volatile boolean recoveredRolledFiles = false;
     private volatile Long expectedRecoveryChecksum;
+    private volatile boolean tailFileChanged = false;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -111,6 +128,7 @@ public class TailFile extends AbstractProcessor {
         properties.add(FILENAME);
         properties.add(ROLLING_FILENAME_PATTERN);
         properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(STATE_FILE).defaultValue("./conf/state/" + getIdentifier()).build());
+        properties.add(START_POSITION);
         return properties;
     }
 
@@ -124,6 +142,7 @@ public class TailFile extends AbstractProcessor {
         if (FILENAME.equals(descriptor)) {
             state = new TailFileState(newValue, null, null, 0L, 0L, null, new byte[65536]);
             recoveredRolledFiles = false;
+            tailFileChanged = true;
         } else if (ROLLING_FILENAME_PATTERN.equals(descriptor)) {
             recoveredRolledFiles = false;
         }
@@ -218,7 +237,7 @@ public class TailFile extends AbstractProcessor {
         try {
             reader = new RandomAccessFile(file, "r");
         } catch (final FileNotFoundException fnfe) {
-            getLogger().debug("File {} does not exist; yielding and returning", new Object[] {file});
+            getLogger().warn("File {} does not exist; will attempt to access file again after the configured Yield Duration has elapsed", new Object[] {file});
             return null;
         }
 
@@ -280,18 +299,24 @@ public class TailFile extends AbstractProcessor {
                         rolledOffFiles.remove(0);
                         FlowFile flowFile = session.create();
                         flowFile = session.importFrom(in, flowFile);
-                        flowFile = session.putAttribute(flowFile, "filename", firstFile.getName());
+                        if (flowFile.getSize() == 0L) {
+                            session.remove(flowFile);
+                            // use a timestamp of lastModified() + 1 so that we do not ingest this file again.
+                            state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, state.getBuffer());
+                        } else {
+                            flowFile = session.putAttribute(flowFile, "filename", firstFile.getName());
 
-                        session.getProvenanceReporter().receive(flowFile, firstFile.toURI().toString(), "FlowFile contains bytes 0 through " + state.getPosition() + " of source file",
-                            TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
-                        session.transfer(flowFile, REL_SUCCESS);
+                            session.getProvenanceReporter().receive(flowFile, firstFile.toURI().toString(), "FlowFile contains bytes 0 through " + state.getPosition() + " of source file",
+                                TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
+                            session.transfer(flowFile, REL_SUCCESS);
 
-                        // use a timestamp of lastModified() + 1 so that we do not ingest this file again.
-                        state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, state.getBuffer());
+                            // use a timestamp of lastModified() + 1 so that we do not ingest this file again.
+                            state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, state.getBuffer());
 
-                        // must ensure that we do session.commit() before persisting state in order to avoid data loss.
-                        session.commit();
-                        persistState(state, context.getProperty(STATE_FILE).getValue());
+                            // must ensure that we do session.commit() before persisting state in order to avoid data loss.
+                            session.commit();
+                            persistState(state, context.getProperty(STATE_FILE).getValue());
+                        }
                     }
                 }
             }
@@ -303,16 +328,20 @@ public class TailFile extends AbstractProcessor {
             for (final File file : rolledOffFiles) {
                 FlowFile flowFile = session.create();
                 flowFile = session.importFrom(file.toPath(), true, flowFile);
-                flowFile = session.putAttribute(flowFile, "filename", file.getName());
-                session.getProvenanceReporter().receive(flowFile, file.toURI().toString());
-                session.transfer(flowFile, REL_SUCCESS);
+                if (flowFile.getSize() == 0L) {
+                    session.remove(flowFile);
+                } else {
+                    flowFile = session.putAttribute(flowFile, "filename", file.getName());
+                    session.getProvenanceReporter().receive(flowFile, file.toURI().toString());
+                    session.transfer(flowFile, REL_SUCCESS);
 
-                // use a timestamp of lastModified() + 1 so that we do not ingest this file again.
-                state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, file.lastModified() + 1L, null, state.getBuffer());
+                    // use a timestamp of lastModified() + 1 so that we do not ingest this file again.
+                    state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, file.lastModified() + 1L, null, state.getBuffer());
 
-                // must ensure that we do session.commit() before persisting state in order to avoid data loss.
-                session.commit();
-                persistState(state, context.getProperty(STATE_FILE).getValue());
+                    // must ensure that we do session.commit() before persisting state in order to avoid data loss.
+                    session.commit();
+                    persistState(state, context.getProperty(STATE_FILE).getValue());
+                }
             }
         } catch (final IOException e) {
             getLogger().error("Failed to recover files that have rolled over due to {}", new Object[] {e});
@@ -320,13 +349,47 @@ public class TailFile extends AbstractProcessor {
     }
 
 
-
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         // If this is the first time the processor has run since it was started, we need to check for any files that may have rolled over
         // while the processor was stopped. If we find any, we need to import them into the flow.
         if (!recoveredRolledFiles) {
-            recoverRolledFiles(context, session);
+            if (tailFileChanged) {
+                final String recoverPosition = context.getProperty(START_POSITION).getValue();
+
+                if (START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) {
+                    recoverRolledFiles(context, session);
+                } else if (START_CURRENT_FILE.getValue().equals(recoverPosition)) {
+                    state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 0L, null, state.getBuffer());
+                } else {
+                    final String filename = context.getProperty(FILENAME).getValue();
+                    final File file = new File(filename);
+
+                    try {
+                        final RandomAccessFile raf = new RandomAccessFile(file, "r");
+                        final Checksum checksum = new CRC32();
+                        final long position = file.length();
+                        final long timestamp = file.lastModified();
+
+                        try (final InputStream fis = new FileInputStream(file);
+                            final CheckedInputStream in = new CheckedInputStream(fis, checksum)) {
+                            StreamUtils.copy(in, new NullOutputStream(), position);
+                        }
+
+                        raf.seek(position);
+                        state = new TailFileState(filename, file, raf, position, timestamp, checksum, state.getBuffer());
+                    } catch (final IOException ioe) {
+                        getLogger().error("Attempted to position Reader at current position in file {} but failed to do so due to {}", new Object[] {file, ioe.toString()}, ioe);
+                        context.yield();
+                        return;
+                    }
+                }
+
+                tailFileChanged = false;
+            } else {
+                recoverRolledFiles(context, session);
+            }
+
             recoveredRolledFiles = true;
         }
 
@@ -372,16 +435,20 @@ public class TailFile extends AbstractProcessor {
 
                             FlowFile flowFile = session.create();
                             flowFile = session.importFrom(fis, flowFile);
-                            flowFile = session.putAttribute(flowFile, "filename", lastRolledOver.getName());
-
-                            session.getProvenanceReporter().receive(flowFile, lastRolledOver.toURI().toString(), "FlowFile contains bytes " + state.getPosition() + " through " +
-                                lastRolledOver.length() + " of source file", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
-                            session.transfer(flowFile, REL_SUCCESS);
-                            this.state = state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, lastRolledOver.lastModified() + 1L, null, state.getBuffer());
-
-                            // must ensure that we do session.commit() before persisting state in order to avoid data loss.
-                            session.commit();
-                            persistState(state, context.getProperty(STATE_FILE).getValue());
+                            if (flowFile.getSize() == 0) {
+                                session.remove(flowFile);
+                            } else {
+                                flowFile = session.putAttribute(flowFile, "filename", lastRolledOver.getName());
+
+                                session.getProvenanceReporter().receive(flowFile, lastRolledOver.toURI().toString(), "FlowFile contains bytes " + state.getPosition() + " through " +
+                                    lastRolledOver.length() + " of source file", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
+                                session.transfer(flowFile, REL_SUCCESS);
+                                this.state = state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, lastRolledOver.lastModified() + 1L, null, state.getBuffer());
+
+                                // must ensure that we do session.commit() before persisting state in order to avoid data loss.
+                                session.commit();
+                                persistState(state, context.getProperty(STATE_FILE).getValue());
+                            }
                         }
                     }
                 }
@@ -429,6 +496,7 @@ public class TailFile extends AbstractProcessor {
         if (consumeData) {
             // data has been written to file. Stream it to a new FlowFile.
             FlowFile flowFile = session.create();
+
             final RandomAccessFile fileReader = reader;
             final LongHolder positionHolder = new LongHolder(position);
             flowFile = session.write(flowFile, new OutputStreamCallback() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/bfa9e450/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index d282a9d..993f2b1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 
 import org.apache.nifi.processors.standard.TailFile.TailFileState;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.After;
@@ -78,7 +79,40 @@ public class TestTailFile {
 
 
     @Test
-    public void testConsumeAfterTruncation() throws IOException {
+    public void testConsumeAfterTruncationStartAtBeginningOfFile() throws IOException, InterruptedException {
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");
+        runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_FILE.getValue());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        raf.write("hello\n".getBytes());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
+        runner.clearTransferState();
+
+        // roll over the file
+        raf.close();
+        file.renameTo(new File(file.getParentFile(), file.getName() + ".previous"));
+        raf = new RandomAccessFile(file, "rw");
+
+        // truncate file
+        raf.setLength(0L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        // write some bytes to the file.
+        Thread.sleep(1000L); // we need to wait at least one second because of the granularity of timestamps on many file systems.
+        raf.write("HELLO\n".getBytes());
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("HELLO\n");
+    }
+
+    @Test
+    public void testConsumeAfterTruncationStartAtCurrentTime() throws IOException, InterruptedException {
+        runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_TIME.getValue());
         runner.run();
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
 
@@ -92,6 +126,8 @@ public class TestTailFile {
         raf.setLength(0L);
         runner.run();
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        Thread.sleep(1000L); // we need to wait at least one second because of the granularity of timestamps on many file systems.
         raf.write("HELLO\n".getBytes());
 
         runner.run();
@@ -101,6 +137,58 @@ public class TestTailFile {
 
 
     @Test
+    public void testStartAtBeginningOfFile() throws IOException, InterruptedException {
+        runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_FILE.getValue());
+
+        raf.write("hello world\n".getBytes());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello world\n");
+    }
+
+    @Test
+    public void testStartAtCurrentTime() throws IOException, InterruptedException {
+        runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_TIME.getValue());
+
+        raf.write("hello world\n".getBytes());
+        runner.run(100);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+    }
+
+    @Test
+    public void testStartAtBeginningOfTime() throws IOException, InterruptedException {
+        raf.write("hello".getBytes());
+        raf.close();
+        file.renameTo(new File(file.getParentFile(), file.getName() + ".previous"));
+
+        raf = new RandomAccessFile(file, "rw");
+        raf.write("world\n".getBytes());
+
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");
+        runner.setProperty(TailFile.START_POSITION, TailFile.START_BEGINNING_OF_TIME.getValue());
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
+
+        boolean world = false;
+        boolean hello = false;
+        for (final MockFlowFile mff : runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS)) {
+            final String content = new String(mff.toByteArray());
+            if ("world\n".equals(content)) {
+                world = true;
+            } else if ("hello".equals(content)) {
+                hello = true;
+            } else {
+                Assert.fail("Got unexpected content: " + content);
+            }
+        }
+
+        assertTrue(hello);
+        assertTrue(world);
+    }
+
+
+    @Test
     public void testRemainderOfFileRecoveredAfterRestart() throws IOException {
         runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log*.txt");
         runner.run();


[4/4] nifi git commit: Merge branch 'NIFI-994'

Posted by ma...@apache.org.
Merge branch 'NIFI-994'


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7a165b62
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7a165b62
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7a165b62

Branch: refs/heads/master
Commit: 7a165b62cc4c46f92b4b4ed7c233f464cf63b3ef
Parents: 854d203 7b9c8df
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Nov 11 16:05:40 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Nov 11 16:05:40 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/TailFile.java      | 780 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../nifi/processors/standard/TestTailFile.java  | 378 +++++++++
 .../src/test/resources/logback-test.xml         |  18 +
 4 files changed, 1177 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7a165b62/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------