You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2014/12/22 12:59:36 UTC
[24/31] incubator-nifi git commit: NIFI-189 Collapsed several commons
libs into one
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteCountingInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteCountingInputStream.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteCountingInputStream.java
deleted file mode 100644
index 792cc32..0000000
--- a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteCountingInputStream.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-public class ByteCountingInputStream extends InputStream {
-
- private final InputStream in;
- private long bytesRead = 0L;
- private long bytesSkipped = 0L;
-
- private long bytesSinceMark = 0L;
-
- public ByteCountingInputStream(final InputStream in) {
- this.in = in;
- }
-
- @Override
- public int read() throws IOException {
- final int fromSuper = in.read();
- if (fromSuper >= 0) {
- bytesRead++;
- bytesSinceMark++;
- }
- return fromSuper;
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- final int fromSuper = in.read(b, off, len);
- if (fromSuper >= 0) {
- bytesRead += fromSuper;
- bytesSinceMark += fromSuper;
- }
-
- return fromSuper;
- }
-
- @Override
- public int read(byte[] b) throws IOException {
- return read(b, 0, b.length);
- }
-
- @Override
- public long skip(final long n) throws IOException {
- final long skipped = in.skip(n);
- if (skipped >= 0) {
- bytesSkipped += skipped;
- bytesSinceMark += skipped;
- }
- return skipped;
- }
-
- public long getBytesRead() {
- return bytesRead;
- }
-
- public long getBytesSkipped() {
- return bytesSkipped;
- }
-
- public long getBytesConsumed() {
- return getBytesRead() + getBytesSkipped();
- }
-
- @Override
- public void mark(final int readlimit) {
- in.mark(readlimit);
-
- bytesSinceMark = 0L;
- }
-
- @Override
- public boolean markSupported() {
- return in.markSupported();
- }
-
- @Override
- public void reset() throws IOException {
- in.reset();
- bytesRead -= bytesSinceMark;
- }
-
- @Override
- public void close() throws IOException {
- in.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteCountingOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteCountingOutputStream.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteCountingOutputStream.java
deleted file mode 100644
index c7b77ff..0000000
--- a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteCountingOutputStream.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-public class ByteCountingOutputStream extends OutputStream {
-
- private final OutputStream out;
- private long bytesWritten = 0L;
-
- public ByteCountingOutputStream(final OutputStream out) {
- this.out = out;
- }
-
- @Override
- public void write(int b) throws IOException {
- out.write(b);
- bytesWritten++;
- }
-
- @Override
- public void write(byte[] b) throws IOException {
- write(b, 0, b.length);
- }
-
- ;
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- out.write(b, off, len);
- bytesWritten += len;
- }
-
- public long getBytesWritten() {
- return bytesWritten;
- }
-
- @Override
- public void flush() throws IOException {
- out.flush();
- }
-
- @Override
- public void close() throws IOException {
- out.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/DataOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/DataOutputStream.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/DataOutputStream.java
deleted file mode 100644
index 6af06d3..0000000
--- a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/DataOutputStream.java
+++ /dev/null
@@ -1,417 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io;
-
-import java.io.DataOutput;
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-
-/**
- * This class is different from java.io.DataOutputStream in that it does
- * synchronize on its methods.
- */
-public class DataOutputStream extends FilterOutputStream implements DataOutput {
-
- /**
- * The number of bytes written to the data output stream so far. If this
- * counter overflows, it will be wrapped to Integer.MAX_VALUE.
- */
- protected int written;
-
- /**
- * bytearr is initialized on demand by writeUTF
- */
- private byte[] bytearr = null;
-
- /**
- * Creates a new data output stream to write data to the specified
- * underlying output stream. The counter <code>written</code> is set to
- * zero.
- *
- * @param out the underlying output stream, to be saved for later use.
- * @see java.io.FilterOutputStream#out
- */
- public DataOutputStream(OutputStream out) {
- super(out);
- }
-
- /**
- * Increases the written counter by the specified value until it reaches
- * Integer.MAX_VALUE.
- */
- private void incCount(int value) {
- int temp = written + value;
- if (temp < 0) {
- temp = Integer.MAX_VALUE;
- }
- written = temp;
- }
-
- /**
- * Writes the specified byte (the low eight bits of the argument
- * <code>b</code>) to the underlying output stream. If no exception is
- * thrown, the counter <code>written</code> is incremented by
- * <code>1</code>.
- * <p>
- * Implements the <code>write</code> method of <code>OutputStream</code>.
- *
- * @param b the <code>byte</code> to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public void write(int b) throws IOException {
- out.write(b);
- incCount(1);
- }
-
- /**
- * Writes <code>len</code> bytes from the specified byte array starting at
- * offset <code>off</code> to the underlying output stream. If no exception
- * is thrown, the counter <code>written</code> is incremented by
- * <code>len</code>.
- *
- * @param b the data.
- * @param off the start offset in the data.
- * @param len the number of bytes to write.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public void write(byte b[], int off, int len) throws IOException {
- out.write(b, off, len);
- incCount(len);
- }
-
- /**
- * Flushes this data output stream. This forces any buffered output bytes to
- * be written out to the stream.
- * <p>
- * The <code>flush</code> method of <code>DataOutputStream</code> calls the
- * <code>flush</code> method of its underlying output stream.
- *
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- * @see java.io.OutputStream#flush()
- */
- @Override
- public void flush() throws IOException {
- out.flush();
- }
-
- /**
- * Writes a <code>boolean</code> to the underlying output stream as a 1-byte
- * value. The value <code>true</code> is written out as the value
- * <code>(byte)1</code>; the value <code>false</code> is written out as the
- * value <code>(byte)0</code>. If no exception is thrown, the counter
- * <code>written</code> is incremented by <code>1</code>.
- *
- * @param v a <code>boolean</code> value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeBoolean(boolean v) throws IOException {
- out.write(v ? 1 : 0);
- incCount(1);
- }
-
- /**
- * Writes out a <code>byte</code> to the underlying output stream as a
- * 1-byte value. If no exception is thrown, the counter <code>written</code>
- * is incremented by <code>1</code>.
- *
- * @param v a <code>byte</code> value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeByte(int v) throws IOException {
- out.write(v);
- incCount(1);
- }
-
- /**
- * Writes a <code>short</code> to the underlying output stream as two bytes,
- * high byte first. If no exception is thrown, the counter
- * <code>written</code> is incremented by <code>2</code>.
- *
- * @param v a <code>short</code> to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeShort(int v) throws IOException {
- out.write((v >>> 8) & 0xFF);
- out.write((v) & 0xFF);
- incCount(2);
- }
-
- /**
- * Writes a <code>char</code> to the underlying output stream as a 2-byte
- * value, high byte first. If no exception is thrown, the counter
- * <code>written</code> is incremented by <code>2</code>.
- *
- * @param v a <code>char</code> value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeChar(int v) throws IOException {
- out.write((v >>> 8) & 0xFF);
- out.write((v) & 0xFF);
- incCount(2);
- }
-
- /**
- * Writes an <code>int</code> to the underlying output stream as four bytes,
- * high byte first. If no exception is thrown, the counter
- * <code>written</code> is incremented by <code>4</code>.
- *
- * @param v an <code>int</code> to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeInt(int v) throws IOException {
- out.write((v >>> 24) & 0xFF);
- out.write((v >>> 16) & 0xFF);
- out.write((v >>> 8) & 0xFF);
- out.write((v) & 0xFF);
- incCount(4);
- }
-
- private final byte writeBuffer[] = new byte[8];
-
- /**
- * Writes a <code>long</code> to the underlying output stream as eight
- * bytes, high byte first. In no exception is thrown, the counter
- * <code>written</code> is incremented by <code>8</code>.
- *
- * @param v a <code>long</code> to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeLong(long v) throws IOException {
- writeBuffer[0] = (byte) (v >>> 56);
- writeBuffer[1] = (byte) (v >>> 48);
- writeBuffer[2] = (byte) (v >>> 40);
- writeBuffer[3] = (byte) (v >>> 32);
- writeBuffer[4] = (byte) (v >>> 24);
- writeBuffer[5] = (byte) (v >>> 16);
- writeBuffer[6] = (byte) (v >>> 8);
- writeBuffer[7] = (byte) (v);
- out.write(writeBuffer, 0, 8);
- incCount(8);
- }
-
- /**
- * Converts the float argument to an <code>int</code> using the
- * <code>floatToIntBits</code> method in class <code>Float</code>, and then
- * writes that <code>int</code> value to the underlying output stream as a
- * 4-byte quantity, high byte first. If no exception is thrown, the counter
- * <code>written</code> is incremented by <code>4</code>.
- *
- * @param v a <code>float</code> value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- * @see java.lang.Float#floatToIntBits(float)
- */
- @Override
- public final void writeFloat(float v) throws IOException {
- writeInt(Float.floatToIntBits(v));
- }
-
- /**
- * Converts the double argument to a <code>long</code> using the
- * <code>doubleToLongBits</code> method in class <code>Double</code>, and
- * then writes that <code>long</code> value to the underlying output stream
- * as an 8-byte quantity, high byte first. If no exception is thrown, the
- * counter <code>written</code> is incremented by <code>8</code>.
- *
- * @param v a <code>double</code> value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- * @see java.lang.Double#doubleToLongBits(double)
- */
- @Override
- public final void writeDouble(double v) throws IOException {
- writeLong(Double.doubleToLongBits(v));
- }
-
- /**
- * Writes out the string to the underlying output stream as a sequence of
- * bytes. Each character in the string is written out, in sequence, by
- * discarding its high eight bits. If no exception is thrown, the counter
- * <code>written</code> is incremented by the length of <code>s</code>.
- *
- * @param s a string of bytes to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeBytes(String s) throws IOException {
- int len = s.length();
- for (int i = 0; i < len; i++) {
- out.write((byte) s.charAt(i));
- }
- incCount(len);
- }
-
- /**
- * Writes a string to the underlying output stream as a sequence of
- * characters. Each character is written to the data output stream as if by
- * the <code>writeChar</code> method. If no exception is thrown, the counter
- * <code>written</code> is incremented by twice the length of
- * <code>s</code>.
- *
- * @param s a <code>String</code> value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.DataOutputStream#writeChar(int)
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeChars(String s) throws IOException {
- int len = s.length();
- for (int i = 0; i < len; i++) {
- int v = s.charAt(i);
- out.write((v >>> 8) & 0xFF);
- out.write((v) & 0xFF);
- }
- incCount(len * 2);
- }
-
- /**
- * Writes a string to the underlying output stream using
- * <a href="DataInput.html#modified-utf-8">modified UTF-8</a>
- * encoding in a machine-independent manner.
- * <p>
- * First, two bytes are written to the output stream as if by the
- * <code>writeShort</code> method giving the number of bytes to follow. This
- * value is the number of bytes actually written out, not the length of the
- * string. Following the length, each character of the string is output, in
- * sequence, using the modified UTF-8 encoding for the character. If no
- * exception is thrown, the counter <code>written</code> is incremented by
- * the total number of bytes written to the output stream. This will be at
- * least two plus the length of <code>str</code>, and at most two plus
- * thrice the length of <code>str</code>.
- *
- * @param str a string to be written.
- * @exception IOException if an I/O error occurs.
- */
- @Override
- public final void writeUTF(String str) throws IOException {
- writeUTF(str, this);
- }
-
- /**
- * Writes a string to the specified DataOutput using
- * <a href="DataInput.html#modified-utf-8">modified UTF-8</a>
- * encoding in a machine-independent manner.
- * <p>
- * First, two bytes are written to out as if by the <code>writeShort</code>
- * method giving the number of bytes to follow. This value is the number of
- * bytes actually written out, not the length of the string. Following the
- * length, each character of the string is output, in sequence, using the
- * modified UTF-8 encoding for the character. If no exception is thrown, the
- * counter <code>written</code> is incremented by the total number of bytes
- * written to the output stream. This will be at least two plus the length
- * of <code>str</code>, and at most two plus thrice the length of
- * <code>str</code>.
- *
- * @param str a string to be written.
- * @param out destination to write to
- * @return The number of bytes written out.
- * @exception IOException if an I/O error occurs.
- */
- static int writeUTF(String str, DataOutput out) throws IOException {
- int strlen = str.length();
- int utflen = 0;
- int c, count = 0;
-
- /* use charAt instead of copying String to char array */
- for (int i = 0; i < strlen; i++) {
- c = str.charAt(i);
- if ((c >= 0x0001) && (c <= 0x007F)) {
- utflen++;
- } else if (c > 0x07FF) {
- utflen += 3;
- } else {
- utflen += 2;
- }
- }
-
- if (utflen > 65535) {
- throw new UTFDataFormatException("encoded string too long: " + utflen + " bytes");
- }
-
- byte[] bytearr = null;
- if (out instanceof DataOutputStream) {
- DataOutputStream dos = (DataOutputStream) out;
- if (dos.bytearr == null || (dos.bytearr.length < (utflen + 2))) {
- dos.bytearr = new byte[(utflen * 2) + 2];
- }
- bytearr = dos.bytearr;
- } else {
- bytearr = new byte[utflen + 2];
- }
-
- bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
- bytearr[count++] = (byte) ((utflen) & 0xFF);
-
- int i = 0;
- for (i = 0; i < strlen; i++) {
- c = str.charAt(i);
- if (!((c >= 0x0001) && (c <= 0x007F))) {
- break;
- }
- bytearr[count++] = (byte) c;
- }
-
- for (; i < strlen; i++) {
- c = str.charAt(i);
- if ((c >= 0x0001) && (c <= 0x007F)) {
- bytearr[count++] = (byte) c;
-
- } else if (c > 0x07FF) {
- bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
- bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
- bytearr[count++] = (byte) (0x80 | ((c) & 0x3F));
- } else {
- bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
- bytearr[count++] = (byte) (0x80 | ((c) & 0x3F));
- }
- }
- out.write(bytearr, 0, utflen + 2);
- return utflen + 2;
- }
-
- /**
- * Returns the current value of the counter <code>written</code>, the number
- * of bytes written to this data output stream so far. If the counter
- * overflows, it will be wrapped to Integer.MAX_VALUE.
- *
- * @return the value of the <code>written</code> field.
- * @see java.io.DataOutputStream#written
- */
- public final int size() {
- return written;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/GZIPOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/GZIPOutputStream.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/GZIPOutputStream.java
deleted file mode 100644
index 875b838..0000000
--- a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/GZIPOutputStream.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * <p>
- * This class extends the {@link java.util.zip.GZIPOutputStream} by allowing the
- * constructor to provide a compression level, and uses a default value of 1,
- * rather than 5.
- * </p>
- */
-public class GZIPOutputStream extends java.util.zip.GZIPOutputStream {
-
- public static final int DEFAULT_COMPRESSION_LEVEL = 1;
-
- public GZIPOutputStream(final OutputStream out) throws IOException {
- this(out, DEFAULT_COMPRESSION_LEVEL);
- }
-
- public GZIPOutputStream(final OutputStream out, final int compressionLevel) throws IOException {
- super(out);
- def.setLevel(compressionLevel);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/LeakyBucketStreamThrottler.java
----------------------------------------------------------------------
diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/LeakyBucketStreamThrottler.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/LeakyBucketStreamThrottler.java
deleted file mode 100644
index 0ebe16d..0000000
--- a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/LeakyBucketStreamThrottler.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class LeakyBucketStreamThrottler implements StreamThrottler {
-
- private final int maxBytesPerSecond;
- private final BlockingQueue<Request> requestQueue = new LinkedBlockingQueue<Request>();
- private final ScheduledExecutorService executorService;
- private final AtomicBoolean shutdown = new AtomicBoolean(false);
-
- public LeakyBucketStreamThrottler(final int maxBytesPerSecond) {
- this.maxBytesPerSecond = maxBytesPerSecond;
-
- executorService = Executors.newSingleThreadScheduledExecutor();
- final Runnable task = new Drain();
- executorService.scheduleAtFixedRate(task, 0, 1000, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void close() {
- this.shutdown.set(true);
-
- executorService.shutdown();
- try {
- // Should not take more than 2 seconds because we run every second. If it takes more than
- // 2 seconds, it is because the Runnable thread is blocking on a write; in this case,
- // we will just ignore it and return
- executorService.awaitTermination(2, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- }
- }
-
- @Override
- public OutputStream newThrottledOutputStream(final OutputStream toWrap) {
- return new OutputStream() {
- @Override
- public void write(final int b) throws IOException {
- write(new byte[]{(byte) b}, 0, 1);
- }
-
- @Override
- public void write(byte[] b) throws IOException {
- write(b, 0, b.length);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- final InputStream in = new ByteArrayInputStream(b, off, len);
- LeakyBucketStreamThrottler.this.copy(in, toWrap);
- }
-
- @Override
- public void close() throws IOException {
- toWrap.close();
- }
-
- @Override
- public void flush() throws IOException {
- toWrap.flush();
- }
- };
- }
-
- @Override
- public InputStream newThrottledInputStream(final InputStream toWrap) {
- return new InputStream() {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- @Override
- public int read() throws IOException {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream(1);
- LeakyBucketStreamThrottler.this.copy(toWrap, baos, 1L);
- if (baos.getBufferLength() < 1) {
- return -1;
- }
-
- return baos.getUnderlyingBuffer()[0] & 0xFF;
- }
-
- @Override
- public int read(final byte[] b) throws IOException {
- if(b.length == 0){
- return 0;
- }
- return read(b, 0, b.length);
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- baos.reset();
- final int copied = (int) LeakyBucketStreamThrottler.this.copy(toWrap, baos, len);
- if (copied == 0) {
- return -1;
- }
- System.arraycopy(baos.getUnderlyingBuffer(), 0, b, off, copied);
- return copied;
- }
-
- @Override
- public void close() throws IOException {
- toWrap.close();
- }
-
- @Override
- public int available() throws IOException {
- return toWrap.available();
- }
- };
- }
-
- @Override
- public long copy(final InputStream in, final OutputStream out) throws IOException {
- return copy(in, out, -1);
- }
-
- @Override
- public long copy(final InputStream in, final OutputStream out, final long maxBytes) throws IOException {
- long totalBytesCopied = 0;
- boolean finished = false;
- while (!finished) {
- final long requestMax = (maxBytes < 0) ? Long.MAX_VALUE : maxBytes - totalBytesCopied;
- final Request request = new Request(in, out, requestMax);
- boolean transferred = false;
- while (!transferred) {
- if (shutdown.get()) {
- throw new IOException("Throttler shutdown");
- }
-
- try {
- transferred = requestQueue.offer(request, 1000, TimeUnit.MILLISECONDS);
- } catch (final InterruptedException e) {
- throw new IOException("Interrupted", e);
- }
- }
-
- final BlockingQueue<Response> responseQueue = request.getResponseQueue();
- Response response = null;
- while (response == null) {
- try {
- if (shutdown.get()) {
- throw new IOException("Throttler shutdown");
- }
- response = responseQueue.poll(1000L, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted", e);
- }
- }
-
- if (!response.isSuccess()) {
- throw response.getError();
- }
-
- totalBytesCopied += response.getBytesCopied();
- finished = (response.getBytesCopied() == 0) || (totalBytesCopied >= maxBytes && maxBytes > 0);
- }
-
- return totalBytesCopied;
- }
-
- /**
- * This class is responsible for draining water from the leaky bucket. I.e.,
- * it actually moves the data
- */
- private class Drain implements Runnable {
-
- private final byte[] buffer;
-
- public Drain() {
- final int bufferSize = Math.min(4096, maxBytesPerSecond);
- buffer = new byte[bufferSize];
- }
-
- @Override
- public void run() {
- final long start = System.currentTimeMillis();
-
- int bytesTransferred = 0;
- while (bytesTransferred < maxBytesPerSecond) {
- final long maxMillisToWait = 1000 - (System.currentTimeMillis() - start);
- if (maxMillisToWait < 1) {
- return;
- }
-
- try {
- final Request request = requestQueue.poll(maxMillisToWait, TimeUnit.MILLISECONDS);
- if (request == null) {
- return;
- }
-
- final BlockingQueue<Response> responseQueue = request.getResponseQueue();
-
- final OutputStream out = request.getOutputStream();
- final InputStream in = request.getInputStream();
-
- try {
- final long requestMax = request.getMaxBytesToCopy();
- long maxBytesToTransfer;
- if (requestMax < 0) {
- maxBytesToTransfer = Math.min(buffer.length, maxBytesPerSecond - bytesTransferred);
- } else {
- maxBytesToTransfer = Math.min(requestMax,
- Math.min(buffer.length, maxBytesPerSecond - bytesTransferred));
- }
- maxBytesToTransfer = Math.max(1L, maxBytesToTransfer);
-
- final int bytesCopied = fillBuffer(in, maxBytesToTransfer);
- out.write(buffer, 0, bytesCopied);
-
- final Response response = new Response(true, bytesCopied);
- responseQueue.put(response);
- bytesTransferred += bytesCopied;
- } catch (final IOException e) {
- final Response response = new Response(e);
- responseQueue.put(response);
- }
- } catch (InterruptedException e) {
- }
- }
- }
-
- private int fillBuffer(final InputStream in, final long maxBytes) throws IOException {
- int bytesRead = 0;
- int len;
- while (bytesRead < maxBytes && (len = in.read(buffer, bytesRead, (int) Math.min(maxBytes - bytesRead, buffer.length - bytesRead))) > 0) {
- bytesRead += len;
- }
-
- return bytesRead;
- }
- }
-
- private static class Response {
-
- private final boolean success;
- private final IOException error;
- private final int bytesCopied;
-
- public Response(final boolean success, final int bytesCopied) {
- this.success = success;
- this.bytesCopied = bytesCopied;
- this.error = null;
- }
-
- public Response(final IOException error) {
- this.success = false;
- this.error = error;
- this.bytesCopied = -1;
- }
-
- public boolean isSuccess() {
- return success;
- }
-
- public IOException getError() {
- return error;
- }
-
- public int getBytesCopied() {
- return bytesCopied;
- }
- }
-
- private static class Request {
-
- private final OutputStream out;
- private final InputStream in;
- private final long maxBytesToCopy;
- private final BlockingQueue<Response> responseQueue;
-
- public Request(final InputStream in, final OutputStream out, final long maxBytesToCopy) {
- this.out = out;
- this.in = in;
- this.maxBytesToCopy = maxBytesToCopy;
- this.responseQueue = new LinkedBlockingQueue<Response>(1);
- }
-
- public BlockingQueue<Response> getResponseQueue() {
- return this.responseQueue;
- }
-
- public OutputStream getOutputStream() {
- return out;
- }
-
- public InputStream getInputStream() {
- return in;
- }
-
- public long getMaxBytesToCopy() {
- return maxBytesToCopy;
- }
-
- @Override
- public String toString() {
- return "Request[maxBytes=" + maxBytesToCopy + "]";
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NonCloseableInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NonCloseableInputStream.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NonCloseableInputStream.java
deleted file mode 100644
index 1fbb093..0000000
--- a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NonCloseableInputStream.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io;
-
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Wraps and InputStream so that the underlying InputStream cannot be closed.
- * This is used so that the InputStream can be wrapped with yet another
- * InputStream and prevent the outer layer from closing the inner InputStream
- */
-public class NonCloseableInputStream extends FilterInputStream {
-
- private final InputStream toWrap;
-
- public NonCloseableInputStream(final InputStream toWrap) {
- super(toWrap);
- this.toWrap = toWrap;
- }
-
- @Override
- public int read() throws IOException {
- return toWrap.read();
- }
-
- @Override
- public int read(byte[] b) throws IOException {
- return toWrap.read(b);
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- return toWrap.read(b, off, len);
- }
-
- @Override
- public void close() throws IOException {
- // do nothing
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NonCloseableOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NonCloseableOutputStream.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NonCloseableOutputStream.java
deleted file mode 100644
index 731e409..0000000
--- a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NonCloseableOutputStream.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io;
-
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-public class NonCloseableOutputStream extends FilterOutputStream {
-
- private final OutputStream out;
-
- public NonCloseableOutputStream(final OutputStream out) {
- super(out);
- this.out = out;
- }
-
- @Override
- public void write(byte[] b) throws IOException {
- out.write(b);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- out.write(b, off, len);
- }
-
- @Override
- public void write(int b) throws IOException {
- out.write(b);
- }
-
- @Override
- public void close() throws IOException {
- out.flush();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NullOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NullOutputStream.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NullOutputStream.java
deleted file mode 100644
index 60475d4..0000000
--- a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NullOutputStream.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * OutputStream that throws away all data, just like as if writing to /dev/null
- */
-public class NullOutputStream extends OutputStream {
-
- @Override
- public void write(final int b) throws IOException {
- }
-
- @Override
- public void write(final byte[] b) throws IOException {
- }
-
- @Override
- public void write(final byte[] b, int off, int len) throws IOException {
- }
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public void flush() throws IOException {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/StreamThrottler.java
----------------------------------------------------------------------
diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/StreamThrottler.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/StreamThrottler.java
deleted file mode 100644
index 8c2aa80..0000000
--- a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/StreamThrottler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public interface StreamThrottler extends Closeable {
-
- long copy(InputStream in, OutputStream out) throws IOException;
-
- long copy(InputStream in, OutputStream out, long maxBytes) throws IOException;
-
- InputStream newThrottledInputStream(final InputStream toWrap);
-
- OutputStream newThrottledOutputStream(final OutputStream toWrap);
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/StreamUtils.java
----------------------------------------------------------------------
diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/StreamUtils.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/StreamUtils.java
deleted file mode 100644
index 1596014..0000000
--- a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/StreamUtils.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.nifi.io.exception.BytePatternNotFoundException;
-import org.apache.nifi.io.util.NonThreadSafeCircularBuffer;
-
-public class StreamUtils {
-
- public static long copy(final InputStream source, final OutputStream destination) throws IOException {
- final byte[] buffer = new byte[8192];
- int len;
- long totalCount = 0L;
- while ((len = source.read(buffer)) > 0) {
- destination.write(buffer, 0, len);
- totalCount += len;
- }
- return totalCount;
- }
-
- /**
- * Copies <code>numBytes</code> from <code>source</code> to
- * <code>destination</code>. If <code>numBytes</code> are not available from
- * <code>source</code>, throws EOFException
- *
- * @param source
- * @param destination
- * @param numBytes
- * @throws IOException
- */
- public static void copy(final InputStream source, final OutputStream destination, final long numBytes) throws IOException {
- final byte[] buffer = new byte[8192];
- int len;
- long bytesLeft = numBytes;
- while ((len = source.read(buffer, 0, (int) Math.min(bytesLeft, buffer.length))) > 0) {
- destination.write(buffer, 0, len);
- bytesLeft -= len;
- }
-
- if (bytesLeft > 0) {
- throw new EOFException("Attempted to copy " + numBytes + " bytes but only " + (numBytes - bytesLeft) + " bytes were available");
- }
- }
-
- /**
- * Reads data from the given input stream, copying it to the destination
- * byte array. If the InputStream has less data than the given byte array,
- * throws an EOFException
- *
- * @param source
- * @param destination
- * @throws IOException
- */
- public static void fillBuffer(final InputStream source, final byte[] destination) throws IOException {
- fillBuffer(source, destination, true);
- }
-
- /**
- * Reads data from the given input stream, copying it to the destination
- * byte array. If the InputStream has less data than the given byte array,
- * throws an EOFException if <code>ensureCapacity</code> is true and
- * otherwise returns the number of bytes copied
- *
- * @param source
- * @param destination
- * @param ensureCapacity whether or not to enforce that the InputStream have
- * at least as much data as the capacity of the destination byte array
- * @return
- * @throws IOException
- */
- public static int fillBuffer(final InputStream source, final byte[] destination, final boolean ensureCapacity) throws IOException {
- int bytesRead = 0;
- int len;
- while (bytesRead < destination.length) {
- len = source.read(destination, bytesRead, destination.length - bytesRead);
- if (len < 0) {
- if (ensureCapacity) {
- throw new EOFException();
- } else {
- break;
- }
- }
-
- bytesRead += len;
- }
-
- return bytesRead;
- }
-
- /**
- * Copies data from in to out until either we are out of data (returns null)
- * or we hit one of the byte patterns identified by the
- * <code>stoppers</code> parameter (returns the byte pattern matched). The
- * bytes in the stopper will be copied.
- *
- * @param in
- * @param out
- * @param maxBytes
- * @param stoppers
- * @return the byte array matched, or null if end of stream was reached
- * @throws IOException
- */
- public static byte[] copyInclusive(final InputStream in, final OutputStream out, final int maxBytes, final byte[]... stoppers) throws IOException {
- if (stoppers.length == 0) {
- return null;
- }
-
- final List<NonThreadSafeCircularBuffer> circularBuffers = new ArrayList<NonThreadSafeCircularBuffer>();
- for (final byte[] stopper : stoppers) {
- circularBuffers.add(new NonThreadSafeCircularBuffer(stopper));
- }
-
- long bytesRead = 0;
- while (true) {
- final int next = in.read();
- if (next == -1) {
- return null;
- } else if (maxBytes > 0 && ++bytesRead >= maxBytes) {
- throw new BytePatternNotFoundException("Did not encounter any byte pattern that was expected; data does not appear to be in the expected format");
- }
-
- out.write(next);
-
- for (final NonThreadSafeCircularBuffer circ : circularBuffers) {
- if (circ.addAndCompare((byte) next)) {
- return circ.getByteArray();
- }
- }
- }
- }
-
- /**
- * Copies data from in to out until either we are out of data (returns null)
- * or we hit one of the byte patterns identified by the
- * <code>stoppers</code> parameter (returns the byte pattern matched). The
- * byte pattern matched will NOT be copied to the output and will be un-read
- * from the input.
- *
- * @param in
- * @param out
- * @param maxBytes
- * @param stoppers
- * @return the byte array matched, or null if end of stream was reached
- * @throws IOException
- */
- public static byte[] copyExclusive(final InputStream in, final OutputStream out, final int maxBytes, final byte[]... stoppers) throws IOException {
- if (stoppers.length == 0) {
- return null;
- }
-
- int longest = 0;
- NonThreadSafeCircularBuffer longestBuffer = null;
- final List<NonThreadSafeCircularBuffer> circularBuffers = new ArrayList<NonThreadSafeCircularBuffer>();
- for (final byte[] stopper : stoppers) {
- final NonThreadSafeCircularBuffer circularBuffer = new NonThreadSafeCircularBuffer(stopper);
- if (stopper.length > longest) {
- longest = stopper.length;
- longestBuffer = circularBuffer;
- circularBuffers.add(0, circularBuffer);
- } else {
- circularBuffers.add(circularBuffer);
- }
- }
-
- long bytesRead = 0;
- while (true) {
- final int next = in.read();
- if (next == -1) {
- return null;
- } else if (maxBytes > 0 && bytesRead++ > maxBytes) {
- throw new BytePatternNotFoundException("Did not encounter any byte pattern that was expected; data does not appear to be in the expected format");
- }
-
- for (final NonThreadSafeCircularBuffer circ : circularBuffers) {
- if (circ.addAndCompare((byte) next)) {
- // The longest buffer has some data that may not have been written out yet; we need to make sure
- // that we copy out those bytes.
- final int bytesToCopy = longest - circ.getByteArray().length;
- for (int i = 0; i < bytesToCopy; i++) {
- final int oldestByte = longestBuffer.getOldestByte();
- if (oldestByte != -1) {
- out.write(oldestByte);
- longestBuffer.addAndCompare((byte) 0);
- }
- }
-
- return circ.getByteArray();
- }
- }
-
- if (longestBuffer.isFilled()) {
- out.write(longestBuffer.getOldestByte());
- }
- }
- }
-
- /**
- * Skips the specified number of bytes from the InputStream
- *
- * If unable to skip that number of bytes, throws EOFException
- *
- * @param stream
- * @param bytesToSkip
- * @throws IOException
- */
- public static void skip(final InputStream stream, final long bytesToSkip) throws IOException {
- if (bytesToSkip <= 0) {
- return;
- }
- long totalSkipped = 0L;
-
- // If we have a FileInputStream, calling skip(1000000) will return 1000000 even if the file is only
- // 3 bytes. As a result, we will skip 1 less than the number requested, and then read the last
- // byte in order to make sure that we've consumed the number of bytes requested. We then check that
- // the final byte, which we read, is not -1.
- final long actualBytesToSkip = bytesToSkip - 1;
- while (totalSkipped < actualBytesToSkip) {
- final long skippedThisIteration = stream.skip(actualBytesToSkip - totalSkipped);
- if (skippedThisIteration == 0) {
- final int nextByte = stream.read();
- if (nextByte == -1) {
- throw new EOFException();
- } else {
- totalSkipped++;
- }
- }
-
- totalSkipped += skippedThisIteration;
- }
-
- final int lastByte = stream.read();
- if (lastByte == -1) {
- throw new EOFException();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ZipOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ZipOutputStream.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ZipOutputStream.java
deleted file mode 100644
index f285720..0000000
--- a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ZipOutputStream.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io;
-
-import java.io.OutputStream;
-
-/**
- * This class extends the {@link java.util.zip.ZipOutputStream} by providing a
- * constructor that allows the user to specify the compression level. The
- * default compression level is 1, as opposed to Java's default of 5.
- */
-public class ZipOutputStream extends java.util.zip.ZipOutputStream {
-
- public static final int DEFAULT_COMPRESSION_LEVEL = 1;
-
- public ZipOutputStream(final OutputStream out) {
- this(out, DEFAULT_COMPRESSION_LEVEL);
- }
-
- public ZipOutputStream(final OutputStream out, final int compressionLevel) {
- super(out);
- def.setLevel(compressionLevel);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/exception/BytePatternNotFoundException.java
----------------------------------------------------------------------
diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/exception/BytePatternNotFoundException.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/exception/BytePatternNotFoundException.java
deleted file mode 100644
index 8935767..0000000
--- a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/exception/BytePatternNotFoundException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.exception;
-
-import java.io.IOException;
-
-public class BytePatternNotFoundException extends IOException {
-
- private static final long serialVersionUID = -4128911284318513973L;
-
- public BytePatternNotFoundException(final String explanation) {
- super(explanation);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/util/NonThreadSafeCircularBuffer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/util/NonThreadSafeCircularBuffer.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/util/NonThreadSafeCircularBuffer.java
deleted file mode 100644
index 1b87488..0000000
--- a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/util/NonThreadSafeCircularBuffer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.util;
-
-import java.util.Arrays;
-
-public class NonThreadSafeCircularBuffer {
-
- private final byte[] lookingFor;
- private final int[] buffer;
- private int insertionPointer = 0;
- private int bufferSize = 0;
-
- public NonThreadSafeCircularBuffer(final byte[] lookingFor) {
- this.lookingFor = lookingFor;
- buffer = new int[lookingFor.length];
- Arrays.fill(buffer, -1);
- }
-
- public byte[] getByteArray() {
- return lookingFor;
- }
-
- /**
- * Returns the oldest byte in the buffer
- *
- * @return
- */
- public int getOldestByte() {
- return buffer[insertionPointer];
- }
-
- public boolean isFilled() {
- return bufferSize >= buffer.length;
- }
-
- public boolean addAndCompare(final byte data) {
- buffer[insertionPointer] = data;
- insertionPointer = (insertionPointer + 1) % lookingFor.length;
-
- bufferSize++;
- if (bufferSize < lookingFor.length) {
- return false;
- }
-
- for (int i = 0; i < lookingFor.length; i++) {
- final byte compare = (byte) buffer[(insertionPointer + i) % lookingFor.length];
- if (compare != lookingFor[i]) {
- return false;
- }
- }
-
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-stream-utils/src/test/java/org/apache/nifi/io/TestLeakyBucketThrottler.java
----------------------------------------------------------------------
diff --git a/commons/nifi-stream-utils/src/test/java/org/apache/nifi/io/TestLeakyBucketThrottler.java b/commons/nifi-stream-utils/src/test/java/org/apache/nifi/io/TestLeakyBucketThrottler.java
deleted file mode 100644
index 12e1801..0000000
--- a/commons/nifi-stream-utils/src/test/java/org/apache/nifi/io/TestLeakyBucketThrottler.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-@Ignore("Tests are time-based")
-public class TestLeakyBucketThrottler {
-
- @Test(timeout = 10000)
- public void testOutputStreamInterface() throws IOException {
- // throttle rate at 1 MB/sec
- final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024);
-
- final byte[] data = new byte[1024 * 1024 * 4];
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final OutputStream throttledOut = throttler.newThrottledOutputStream(baos);
-
- final long start = System.currentTimeMillis();
- throttledOut.write(data);
- throttler.close();
- final long millis = System.currentTimeMillis() - start;
- // should take 4 sec give or take
- assertTrue(millis > 3000);
- assertTrue(millis < 6000);
- }
-
- @Test(timeout = 10000)
- public void testInputStreamInterface() throws IOException {
- // throttle rate at 1 MB/sec
- final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024);
-
- final byte[] data = new byte[1024 * 1024 * 4];
- final ByteArrayInputStream bais = new ByteArrayInputStream(data);
- final InputStream throttledIn = throttler.newThrottledInputStream(bais);
-
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- final byte[] buffer = new byte[4096];
- final long start = System.currentTimeMillis();
- int len;
- while ((len = throttledIn.read(buffer)) > 0) {
- baos.write(buffer, 0, len);
- }
- throttler.close();
- final long millis = System.currentTimeMillis() - start;
- // should take 4 sec give or take
- assertTrue(millis > 3000);
- assertTrue(millis < 6000);
- baos.close();
- }
-
- @Test(timeout = 10000)
- public void testDirectInterface() throws IOException, InterruptedException {
- // throttle rate at 1 MB/sec
- final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024);
-
- // create 3 threads, each sending ~2 MB
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final List<Thread> threads = new ArrayList<Thread>();
- for (int i = 0; i < 3; i++) {
- final Thread t = new WriterThread(i, throttler, baos);
- threads.add(t);
- }
-
- final long start = System.currentTimeMillis();
- for (final Thread t : threads) {
- t.start();
- }
-
- for (final Thread t : threads) {
- t.join();
- }
- final long elapsed = System.currentTimeMillis() - start;
-
- throttler.close();
-
- // To send 15 MB, it should have taken at least 5 seconds and no more than 7 seconds, to
- // allow for busy-ness and the fact that we could write a tiny bit more than the limit.
- assertTrue(elapsed > 5000);
- assertTrue(elapsed < 7000);
-
- // ensure bytes were copied out appropriately
- assertEquals(3 * (2 * 1024 * 1024 + 1), baos.getBufferLength());
- assertEquals((byte) 'A', baos.getUnderlyingBuffer()[baos.getBufferLength() - 1]);
- }
-
- private static class WriterThread extends Thread {
-
- private final int idx;
- private final byte[] data = new byte[1024 * 1024 * 2 + 1];
- private final LeakyBucketStreamThrottler throttler;
- private final OutputStream out;
-
- public WriterThread(final int idx, final LeakyBucketStreamThrottler throttler, final OutputStream out) {
- this.idx = idx;
- this.throttler = throttler;
- this.out = out;
- this.data[this.data.length - 1] = (byte) 'A';
- }
-
- @Override
- public void run() {
- long startMillis = System.currentTimeMillis();
- long bytesWritten = 0L;
- try {
- throttler.copy(new ByteArrayInputStream(data), out);
- } catch (IOException e) {
- e.printStackTrace();
- return;
- }
- long now = System.currentTimeMillis();
- long millisElapsed = now - startMillis;
- bytesWritten += data.length;
- float bytesPerSec = (float) bytesWritten / (float) millisElapsed * 1000F;
- System.out.println(idx + " : copied data at a rate of " + bytesPerSec + " bytes/sec");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-stream-utils/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/commons/nifi-stream-utils/src/test/resources/logback-test.xml b/commons/nifi-stream-utils/src/test/resources/logback-test.xml
deleted file mode 100644
index 0f3f60c..0000000
--- a/commons/nifi-stream-utils/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,48 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<configuration scan="true" scanPeriod="30 seconds">
- <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
- <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
- <pattern>%-4r [%t] %-5p %c - %m%n</pattern>
- </encoder>
- </appender>
-
- <!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR -->
- <logger name="org.apache.nifi" level="DEBUG"/>
-
- <!-- Logger for managing logging statements for nifi clusters. -->
- <logger name="org.apache.nifi.cluster" level="INFO"/>
-
- <!--
- Logger for logging HTTP requests received by the web server. Setting
- log level to 'debug' activates HTTP request logging.
- -->
- <logger name="org.apache.nifi.server.JettyServer" level="INFO"/>
-
- <!-- Logger for managing logging statements for jetty -->
- <logger name="org.mortbay" level="INFO"/>
-
- <!-- Suppress non-error messages due to excessive logging by class -->
- <logger name="com.sun.jersey.spi.container.servlet.WebComponent" level="ERROR"/>
-
- <logger name="org.apache.nifi.processors.standard" level="DEBUG"/>
-
- <root level="INFO">
- <appender-ref ref="CONSOLE"/>
- </root>
-
-</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/pom.xml
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/pom.xml b/commons/nifi-utils/pom.xml
index 6a6cee1..c5c2a68 100644
--- a/commons/nifi-utils/pom.xml
+++ b/commons/nifi-utils/pom.xml
@@ -26,4 +26,8 @@
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>NiFi Utils</name>
+ <!--
+ This project intentionally has no additional dependencies beyond that pulled in by the parent. It is a general purpose utility library
+ and should keep its surface/tension minimal.
+ -->
</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java b/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java
new file mode 100644
index 0000000..24f43ca
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowfile.attributes;
+
+public enum CoreAttributes implements FlowFileAttributeKey {
+ /**
+ * The flowfile's path indicates the relative directory to which a FlowFile belongs and does not
+ * contain the filename
+ */
+ PATH("path"),
+
+ /**
+ * The flowfile's absolute path indicates the absolute directory to which a FlowFile belongs and does not
+ * contain the filename
+ */
+ ABSOLUTE_PATH("absolute.path"),
+
+ /**
+ * The filename of the FlowFile. The filename should not contain any directory structure.
+ */
+ FILENAME("filename"),
+
+ /**
+ * A unique UUID assigned to this FlowFile
+ */
+ UUID("uuid"),
+
+ /**
+ * A numeric value indicating the FlowFile priority
+ */
+ PRIORITY("priority"),
+
+ /**
+ * The MIME Type of this FlowFile
+ */
+ MIME_TYPE("mime.type"),
+
+ /**
+ * Specifies the reason that a FlowFile is being discarded
+ */
+ DISCARD_REASON("discard.reason"),
+
+ /**
+ * Indicates an identifier other than the FlowFile's UUID that is known to refer to this FlowFile.
+ */
+ ALTERNATE_IDENTIFIER("alternate.identifier");
+
+ private final String key;
+ private CoreAttributes(final String key) {
+ this.key = key;
+ }
+
+ @Override
+ public String key() {
+ return key;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java b/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java
new file mode 100644
index 0000000..cc6c28e
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowfile.attributes;
+
+public interface FlowFileAttributeKey {
+ String key();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/io/CompoundUpdateMonitor.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/CompoundUpdateMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/CompoundUpdateMonitor.java
deleted file mode 100644
index e22032b..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/io/CompoundUpdateMonitor.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * An {@link UpdateMonitor} that combines multiple <code>UpdateMonitor</code>s
- * such that it will indicate a change in a file only if ALL sub-monitors
- * indicate a change. The sub-monitors will be applied in the order given and if
- * any indicates that the state has not changed, the subsequent sub-monitors may
- * not be given a chance to run
- */
-public class CompoundUpdateMonitor implements UpdateMonitor {
-
- private final List<UpdateMonitor> monitors;
-
- public CompoundUpdateMonitor(final UpdateMonitor first, final UpdateMonitor... others) {
- monitors = new ArrayList<>();
- monitors.add(first);
- for (final UpdateMonitor monitor : others) {
- monitors.add(monitor);
- }
- }
-
- @Override
- public Object getCurrentState(final Path path) throws IOException {
- return new DeferredMonitorAction(monitors, path);
- }
-
- private static class DeferredMonitorAction {
-
- private static final Object NON_COMPUTED_VALUE = new Object();
-
- private final List<UpdateMonitor> monitors;
- private final Path path;
-
- private final Object[] preCalculated;
-
- public DeferredMonitorAction(final List<UpdateMonitor> monitors, final Path path) {
- this.monitors = monitors;
- this.path = path;
- preCalculated = new Object[monitors.size()];
-
- for (int i = 0; i < preCalculated.length; i++) {
- preCalculated[i] = NON_COMPUTED_VALUE;
- }
- }
-
- private Object getCalculatedValue(final int i) throws IOException {
- if (preCalculated[i] == NON_COMPUTED_VALUE) {
- preCalculated[i] = monitors.get(i).getCurrentState(path);
- }
-
- return preCalculated[i];
- }
-
- @Override
- public boolean equals(final Object obj) {
- // must return true unless ALL DeferredMonitorAction's indicate that they are different
- if (obj == null) {
- return false;
- }
-
- if (!(obj instanceof DeferredMonitorAction)) {
- return false;
- }
-
- final DeferredMonitorAction other = (DeferredMonitorAction) obj;
- try {
- // Go through each UpdateMonitor's value and check if the value has changed.
- for (int i = 0; i < preCalculated.length; i++) {
- final Object mine = getCalculatedValue(i);
- final Object theirs = other.getCalculatedValue(i);
-
- if (mine == theirs) {
- // same
- return true;
- }
-
- if (mine == null && theirs == null) {
- // same
- return true;
- }
-
- if (mine.equals(theirs)) {
- return true;
- }
- }
- } catch (final IOException e) {
- return false;
- }
-
- // No DeferredMonitorAction was the same as last time. Therefore, it's not equal
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/io/LastModifiedMonitor.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/LastModifiedMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/LastModifiedMonitor.java
deleted file mode 100644
index f446465..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/io/LastModifiedMonitor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-
-public class LastModifiedMonitor implements UpdateMonitor {
-
- @Override
- public Object getCurrentState(final Path path) throws IOException {
- return Files.getLastModifiedTime(path);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/io/MD5SumMonitor.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/MD5SumMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/MD5SumMonitor.java
deleted file mode 100644
index 1326c2a..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/io/MD5SumMonitor.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Path;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-
-public class MD5SumMonitor implements UpdateMonitor {
-
- @Override
- public Object getCurrentState(final Path path) throws IOException {
- final MessageDigest digest;
- try {
- digest = MessageDigest.getInstance("MD5");
- } catch (final NoSuchAlgorithmException nsae) {
- throw new AssertionError(nsae);
- }
-
- try (final FileInputStream fis = new FileInputStream(path.toFile())) {
- int len;
- final byte[] buffer = new byte[8192];
- while ((len = fis.read(buffer)) > 0) {
- digest.update(buffer, 0, len);
- }
- }
-
- // Return a ByteBuffer instead of byte[] because we want equals() to do a deep equality
- return ByteBuffer.wrap(digest.digest());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/io/SynchronousFileWatcher.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/SynchronousFileWatcher.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/SynchronousFileWatcher.java
deleted file mode 100644
index 785f1ac..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/io/SynchronousFileWatcher.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Allows the user to configure a {@link java.nio.file.Path Path} to watch for
- * modifications and periodically poll to check if the file has been modified
- */
-public class SynchronousFileWatcher {
-
- private final Path path;
- private final long checkUpdateMillis;
- private final UpdateMonitor monitor;
- private final AtomicReference<StateWrapper> lastState;
- private final Lock resourceLock = new ReentrantLock();
-
- public SynchronousFileWatcher(final Path path, final UpdateMonitor monitor) {
- this(path, monitor, 0L);
- }
-
- public SynchronousFileWatcher(final Path path, final UpdateMonitor monitor, final long checkMillis) {
- if (checkMillis < 0) {
- throw new IllegalArgumentException();
- }
-
- this.path = path;
- checkUpdateMillis = checkMillis;
- this.monitor = monitor;
-
- Object currentState;
- try {
- currentState = monitor.getCurrentState(path);
- } catch (final IOException e) {
- currentState = null;
- }
-
- this.lastState = new AtomicReference<>(new StateWrapper(currentState));
- }
-
- /**
- * Checks if the file has been updated according to the configured
- * {@link UpdateMonitor} and resets the state
- *
- * @return
- * @throws IOException
- */
- public boolean checkAndReset() throws IOException {
- if (checkUpdateMillis <= 0) { // if checkUpdateMillis <= 0, always check
- return checkForUpdate();
- } else {
- final StateWrapper stateWrapper = lastState.get();
- if (stateWrapper.getTimestamp() < System.currentTimeMillis() - checkUpdateMillis) {
- return checkForUpdate();
- }
- return false;
- }
- }
-
- private boolean checkForUpdate() throws IOException {
- if (resourceLock.tryLock()) {
- try {
- final StateWrapper wrapper = lastState.get();
- final Object newState = monitor.getCurrentState(path);
- if (newState == null && wrapper.getState() == null) {
- return false;
- }
- if (newState == null || wrapper.getState() == null) {
- lastState.set(new StateWrapper(newState));
- return true;
- }
-
- final boolean unmodified = newState.equals(wrapper.getState());
- if (!unmodified) {
- lastState.set(new StateWrapper(newState));
- }
- return !unmodified;
- } finally {
- resourceLock.unlock();
- }
- } else {
- return false;
- }
- }
-
- private static class StateWrapper {
-
- private final Object state;
- private final long timestamp;
-
- public StateWrapper(final Object state) {
- this.state = state;
- this.timestamp = System.currentTimeMillis();
- }
-
- public Object getState() {
- return state;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
- }
-}