You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/01/31 04:44:14 UTC
[37/62] [abbrv] [partial] incubator-nifi git commit: NIFI-270 Made
all changes identified by adam, mark, joey to prep for a cleaner build
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
deleted file mode 100644
index ae075b5..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class LeakyBucketStreamThrottler implements StreamThrottler {
-
- private final int maxBytesPerSecond;
- private final BlockingQueue<Request> requestQueue = new LinkedBlockingQueue<Request>();
- private final ScheduledExecutorService executorService;
- private final AtomicBoolean shutdown = new AtomicBoolean(false);
-
- public LeakyBucketStreamThrottler(final int maxBytesPerSecond) {
- this.maxBytesPerSecond = maxBytesPerSecond;
-
- executorService = Executors.newSingleThreadScheduledExecutor();
- final Runnable task = new Drain();
- executorService.scheduleAtFixedRate(task, 0, 1000, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void close() {
- this.shutdown.set(true);
-
- executorService.shutdown();
- try {
- // Should not take more than 2 seconds because we run every second. If it takes more than
- // 2 seconds, it is because the Runnable thread is blocking on a write; in this case,
- // we will just ignore it and return
- executorService.awaitTermination(2, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- }
- }
-
- @Override
- public OutputStream newThrottledOutputStream(final OutputStream toWrap) {
- return new OutputStream() {
- @Override
- public void write(final int b) throws IOException {
- write(new byte[]{(byte) b}, 0, 1);
- }
-
- @Override
- public void write(byte[] b) throws IOException {
- write(b, 0, b.length);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- final InputStream in = new ByteArrayInputStream(b, off, len);
- LeakyBucketStreamThrottler.this.copy(in, toWrap);
- }
-
- @Override
- public void close() throws IOException {
- toWrap.close();
- }
-
- @Override
- public void flush() throws IOException {
- toWrap.flush();
- }
- };
- }
-
- @Override
- public InputStream newThrottledInputStream(final InputStream toWrap) {
- return new InputStream() {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- @Override
- public int read() throws IOException {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream(1);
- LeakyBucketStreamThrottler.this.copy(toWrap, baos, 1L);
- if (baos.getBufferLength() < 1) {
- return -1;
- }
-
- return baos.getUnderlyingBuffer()[0] & 0xFF;
- }
-
- @Override
- public int read(final byte[] b) throws IOException {
- if(b.length == 0){
- return 0;
- }
- return read(b, 0, b.length);
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- if ( len < 0 ) {
- throw new IllegalArgumentException();
- }
- if ( len == 0 ) {
- return 0;
- }
-
- baos.reset();
- final int copied = (int) LeakyBucketStreamThrottler.this.copy(toWrap, baos, len);
- if (copied == 0) {
- return -1;
- }
- System.arraycopy(baos.getUnderlyingBuffer(), 0, b, off, copied);
- return copied;
- }
-
- @Override
- public void close() throws IOException {
- toWrap.close();
- }
-
- @Override
- public int available() throws IOException {
- return toWrap.available();
- }
- };
- }
-
- @Override
- public long copy(final InputStream in, final OutputStream out) throws IOException {
- return copy(in, out, -1);
- }
-
- @Override
- public long copy(final InputStream in, final OutputStream out, final long maxBytes) throws IOException {
- long totalBytesCopied = 0;
- boolean finished = false;
- while (!finished) {
- final long requestMax = (maxBytes < 0) ? Long.MAX_VALUE : maxBytes - totalBytesCopied;
- final Request request = new Request(in, out, requestMax);
- boolean transferred = false;
- while (!transferred) {
- if (shutdown.get()) {
- throw new IOException("Throttler shutdown");
- }
-
- try {
- transferred = requestQueue.offer(request, 1000, TimeUnit.MILLISECONDS);
- } catch (final InterruptedException e) {
- throw new IOException("Interrupted", e);
- }
- }
-
- final BlockingQueue<Response> responseQueue = request.getResponseQueue();
- Response response = null;
- while (response == null) {
- try {
- if (shutdown.get()) {
- throw new IOException("Throttler shutdown");
- }
- response = responseQueue.poll(1000L, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted", e);
- }
- }
-
- if (!response.isSuccess()) {
- throw response.getError();
- }
-
- totalBytesCopied += response.getBytesCopied();
- finished = (response.getBytesCopied() == 0) || (totalBytesCopied >= maxBytes && maxBytes > 0);
- }
-
- return totalBytesCopied;
- }
-
- /**
- * This class is responsible for draining water from the leaky bucket. I.e.,
- * it actually moves the data
- */
- private class Drain implements Runnable {
-
- private final byte[] buffer;
-
- public Drain() {
- final int bufferSize = Math.min(4096, maxBytesPerSecond);
- buffer = new byte[bufferSize];
- }
-
- @Override
- public void run() {
- final long start = System.currentTimeMillis();
-
- int bytesTransferred = 0;
- while (bytesTransferred < maxBytesPerSecond) {
- final long maxMillisToWait = 1000 - (System.currentTimeMillis() - start);
- if (maxMillisToWait < 1) {
- return;
- }
-
- try {
- final Request request = requestQueue.poll(maxMillisToWait, TimeUnit.MILLISECONDS);
- if (request == null) {
- return;
- }
-
- final BlockingQueue<Response> responseQueue = request.getResponseQueue();
-
- final OutputStream out = request.getOutputStream();
- final InputStream in = request.getInputStream();
-
- try {
- final long requestMax = request.getMaxBytesToCopy();
- long maxBytesToTransfer;
- if (requestMax < 0) {
- maxBytesToTransfer = Math.min(buffer.length, maxBytesPerSecond - bytesTransferred);
- } else {
- maxBytesToTransfer = Math.min(requestMax,
- Math.min(buffer.length, maxBytesPerSecond - bytesTransferred));
- }
- maxBytesToTransfer = Math.max(1L, maxBytesToTransfer);
-
- final int bytesCopied = fillBuffer(in, maxBytesToTransfer);
- out.write(buffer, 0, bytesCopied);
-
- final Response response = new Response(true, bytesCopied);
- responseQueue.put(response);
- bytesTransferred += bytesCopied;
- } catch (final IOException e) {
- final Response response = new Response(e);
- responseQueue.put(response);
- }
- } catch (InterruptedException e) {
- }
- }
- }
-
- private int fillBuffer(final InputStream in, final long maxBytes) throws IOException {
- int bytesRead = 0;
- int len;
- while (bytesRead < maxBytes && (len = in.read(buffer, bytesRead, (int) Math.min(maxBytes - bytesRead, buffer.length - bytesRead))) > 0) {
- bytesRead += len;
- }
-
- return bytesRead;
- }
- }
-
- private static class Response {
-
- private final boolean success;
- private final IOException error;
- private final int bytesCopied;
-
- public Response(final boolean success, final int bytesCopied) {
- this.success = success;
- this.bytesCopied = bytesCopied;
- this.error = null;
- }
-
- public Response(final IOException error) {
- this.success = false;
- this.error = error;
- this.bytesCopied = -1;
- }
-
- public boolean isSuccess() {
- return success;
- }
-
- public IOException getError() {
- return error;
- }
-
- public int getBytesCopied() {
- return bytesCopied;
- }
- }
-
- private static class Request {
-
- private final OutputStream out;
- private final InputStream in;
- private final long maxBytesToCopy;
- private final BlockingQueue<Response> responseQueue;
-
- public Request(final InputStream in, final OutputStream out, final long maxBytesToCopy) {
- this.out = out;
- this.in = in;
- this.maxBytesToCopy = maxBytesToCopy;
- this.responseQueue = new LinkedBlockingQueue<Response>(1);
- }
-
- public BlockingQueue<Response> getResponseQueue() {
- return this.responseQueue;
- }
-
- public OutputStream getOutputStream() {
- return out;
- }
-
- public InputStream getInputStream() {
- return in;
- }
-
- public long getMaxBytesToCopy() {
- return maxBytesToCopy;
- }
-
- @Override
- public String toString() {
- return "Request[maxBytes=" + maxBytesToCopy + "]";
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java
deleted file mode 100644
index 0e75a22..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Wraps and InputStream so that the underlying InputStream cannot be closed.
- * This is used so that the InputStream can be wrapped with yet another
- * InputStream and prevent the outer layer from closing the inner InputStream
- */
-public class NonCloseableInputStream extends FilterInputStream {
-
- private final InputStream toWrap;
-
- public NonCloseableInputStream(final InputStream toWrap) {
- super(toWrap);
- this.toWrap = toWrap;
- }
-
- @Override
- public int read() throws IOException {
- return toWrap.read();
- }
-
- @Override
- public int read(byte[] b) throws IOException {
- return toWrap.read(b);
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- return toWrap.read(b, off, len);
- }
-
- @Override
- public void close() throws IOException {
- // do nothing
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java
deleted file mode 100644
index 9c77637..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-public class NonCloseableOutputStream extends FilterOutputStream {
-
- private final OutputStream out;
-
- public NonCloseableOutputStream(final OutputStream out) {
- super(out);
- this.out = out;
- }
-
- @Override
- public void write(byte[] b) throws IOException {
- out.write(b);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- out.write(b, off, len);
- }
-
- @Override
- public void write(int b) throws IOException {
- out.write(b);
- }
-
- @Override
- public void close() throws IOException {
- out.flush();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java
deleted file mode 100644
index 8452761..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * OutputStream that throws away all data, just like as if writing to /dev/null
- */
-public class NullOutputStream extends OutputStream {
-
- @Override
- public void write(final int b) throws IOException {
- }
-
- @Override
- public void write(final byte[] b) throws IOException {
- }
-
- @Override
- public void write(final byte[] b, int off, int len) throws IOException {
- }
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public void flush() throws IOException {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
deleted file mode 100644
index 9158050..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public interface StreamThrottler extends Closeable {
-
- long copy(InputStream in, OutputStream out) throws IOException;
-
- long copy(InputStream in, OutputStream out, long maxBytes) throws IOException;
-
- InputStream newThrottledInputStream(final InputStream toWrap);
-
- OutputStream newThrottledOutputStream(final OutputStream toWrap);
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java
deleted file mode 100644
index 8e3d606..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.nifi.stream.io.exception.BytePatternNotFoundException;
-import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
-
-public class StreamUtils {
-
- public static long copy(final InputStream source, final OutputStream destination) throws IOException {
- final byte[] buffer = new byte[8192];
- int len;
- long totalCount = 0L;
- while ((len = source.read(buffer)) > 0) {
- destination.write(buffer, 0, len);
- totalCount += len;
- }
- return totalCount;
- }
-
- /**
- * Copies <code>numBytes</code> from <code>source</code> to
- * <code>destination</code>. If <code>numBytes</code> are not available from
- * <code>source</code>, throws EOFException
- *
- * @param source
- * @param destination
- * @param numBytes
- * @throws IOException
- */
- public static void copy(final InputStream source, final OutputStream destination, final long numBytes) throws IOException {
- final byte[] buffer = new byte[8192];
- int len;
- long bytesLeft = numBytes;
- while ((len = source.read(buffer, 0, (int) Math.min(bytesLeft, buffer.length))) > 0) {
- destination.write(buffer, 0, len);
- bytesLeft -= len;
- }
-
- if (bytesLeft > 0) {
- throw new EOFException("Attempted to copy " + numBytes + " bytes but only " + (numBytes - bytesLeft) + " bytes were available");
- }
- }
-
- /**
- * Reads data from the given input stream, copying it to the destination
- * byte array. If the InputStream has less data than the given byte array,
- * throws an EOFException
- *
- * @param source
- * @param destination
- * @throws IOException
- */
- public static void fillBuffer(final InputStream source, final byte[] destination) throws IOException {
- fillBuffer(source, destination, true);
- }
-
- /**
- * Reads data from the given input stream, copying it to the destination
- * byte array. If the InputStream has less data than the given byte array,
- * throws an EOFException if <code>ensureCapacity</code> is true and
- * otherwise returns the number of bytes copied
- *
- * @param source
- * @param destination
- * @param ensureCapacity whether or not to enforce that the InputStream have
- * at least as much data as the capacity of the destination byte array
- * @return
- * @throws IOException
- */
- public static int fillBuffer(final InputStream source, final byte[] destination, final boolean ensureCapacity) throws IOException {
- int bytesRead = 0;
- int len;
- while (bytesRead < destination.length) {
- len = source.read(destination, bytesRead, destination.length - bytesRead);
- if (len < 0) {
- if (ensureCapacity) {
- throw new EOFException();
- } else {
- break;
- }
- }
-
- bytesRead += len;
- }
-
- return bytesRead;
- }
-
- /**
- * Copies data from in to out until either we are out of data (returns null)
- * or we hit one of the byte patterns identified by the
- * <code>stoppers</code> parameter (returns the byte pattern matched). The
- * bytes in the stopper will be copied.
- *
- * @param in
- * @param out
- * @param maxBytes
- * @param stoppers
- * @return the byte array matched, or null if end of stream was reached
- * @throws IOException
- */
- public static byte[] copyInclusive(final InputStream in, final OutputStream out, final int maxBytes, final byte[]... stoppers) throws IOException {
- if (stoppers.length == 0) {
- return null;
- }
-
- final List<NonThreadSafeCircularBuffer> circularBuffers = new ArrayList<NonThreadSafeCircularBuffer>();
- for (final byte[] stopper : stoppers) {
- circularBuffers.add(new NonThreadSafeCircularBuffer(stopper));
- }
-
- long bytesRead = 0;
- while (true) {
- final int next = in.read();
- if (next == -1) {
- return null;
- } else if (maxBytes > 0 && ++bytesRead >= maxBytes) {
- throw new BytePatternNotFoundException("Did not encounter any byte pattern that was expected; data does not appear to be in the expected format");
- }
-
- out.write(next);
-
- for (final NonThreadSafeCircularBuffer circ : circularBuffers) {
- if (circ.addAndCompare((byte) next)) {
- return circ.getByteArray();
- }
- }
- }
- }
-
- /**
- * Copies data from in to out until either we are out of data (returns null)
- * or we hit one of the byte patterns identified by the
- * <code>stoppers</code> parameter (returns the byte pattern matched). The
- * byte pattern matched will NOT be copied to the output and will be un-read
- * from the input.
- *
- * @param in
- * @param out
- * @param maxBytes
- * @param stoppers
- * @return the byte array matched, or null if end of stream was reached
- * @throws IOException
- */
- public static byte[] copyExclusive(final InputStream in, final OutputStream out, final int maxBytes, final byte[]... stoppers) throws IOException {
- if (stoppers.length == 0) {
- return null;
- }
-
- int longest = 0;
- NonThreadSafeCircularBuffer longestBuffer = null;
- final List<NonThreadSafeCircularBuffer> circularBuffers = new ArrayList<NonThreadSafeCircularBuffer>();
- for (final byte[] stopper : stoppers) {
- final NonThreadSafeCircularBuffer circularBuffer = new NonThreadSafeCircularBuffer(stopper);
- if (stopper.length > longest) {
- longest = stopper.length;
- longestBuffer = circularBuffer;
- circularBuffers.add(0, circularBuffer);
- } else {
- circularBuffers.add(circularBuffer);
- }
- }
-
- long bytesRead = 0;
- while (true) {
- final int next = in.read();
- if (next == -1) {
- return null;
- } else if (maxBytes > 0 && bytesRead++ > maxBytes) {
- throw new BytePatternNotFoundException("Did not encounter any byte pattern that was expected; data does not appear to be in the expected format");
- }
-
- for (final NonThreadSafeCircularBuffer circ : circularBuffers) {
- if (circ.addAndCompare((byte) next)) {
- // The longest buffer has some data that may not have been written out yet; we need to make sure
- // that we copy out those bytes.
- final int bytesToCopy = longest - circ.getByteArray().length;
- for (int i = 0; i < bytesToCopy; i++) {
- final int oldestByte = longestBuffer.getOldestByte();
- if (oldestByte != -1) {
- out.write(oldestByte);
- longestBuffer.addAndCompare((byte) 0);
- }
- }
-
- return circ.getByteArray();
- }
- }
-
- if (longestBuffer.isFilled()) {
- out.write(longestBuffer.getOldestByte());
- }
- }
- }
-
- /**
- * Skips the specified number of bytes from the InputStream
- *
- * If unable to skip that number of bytes, throws EOFException
- *
- * @param stream
- * @param bytesToSkip
- * @throws IOException
- */
- public static void skip(final InputStream stream, final long bytesToSkip) throws IOException {
- if (bytesToSkip <= 0) {
- return;
- }
- long totalSkipped = 0L;
-
- // If we have a FileInputStream, calling skip(1000000) will return 1000000 even if the file is only
- // 3 bytes. As a result, we will skip 1 less than the number requested, and then read the last
- // byte in order to make sure that we've consumed the number of bytes requested. We then check that
- // the final byte, which we read, is not -1.
- final long actualBytesToSkip = bytesToSkip - 1;
- while (totalSkipped < actualBytesToSkip) {
- final long skippedThisIteration = stream.skip(actualBytesToSkip - totalSkipped);
- if (skippedThisIteration == 0) {
- final int nextByte = stream.read();
- if (nextByte == -1) {
- throw new EOFException();
- } else {
- totalSkipped++;
- }
- }
-
- totalSkipped += skippedThisIteration;
- }
-
- final int lastByte = stream.read();
- if (lastByte == -1) {
- throw new EOFException();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java
deleted file mode 100644
index 2b9050d..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.OutputStream;
-
-/**
- * This class extends the {@link java.util.zip.ZipOutputStream} by providing a
- * constructor that allows the user to specify the compression level. The
- * default compression level is 1, as opposed to Java's default of 5.
- */
-public class ZipOutputStream extends java.util.zip.ZipOutputStream {
-
- public static final int DEFAULT_COMPRESSION_LEVEL = 1;
-
- public ZipOutputStream(final OutputStream out) {
- this(out, DEFAULT_COMPRESSION_LEVEL);
- }
-
- public ZipOutputStream(final OutputStream out, final int compressionLevel) {
- super(out);
- def.setLevel(compressionLevel);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java
deleted file mode 100644
index 5d08616..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io.exception;
-
-import java.io.IOException;
-
-public class BytePatternNotFoundException extends IOException {
-
- private static final long serialVersionUID = -4128911284318513973L;
-
- public BytePatternNotFoundException(final String explanation) {
- super(explanation);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java
deleted file mode 100644
index b4b4c17..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io.util;
-
-import java.util.Arrays;
-
-public class NonThreadSafeCircularBuffer {
-
- private final byte[] lookingFor;
- private final int[] buffer;
- private int insertionPointer = 0;
- private int bufferSize = 0;
-
- public NonThreadSafeCircularBuffer(final byte[] lookingFor) {
- this.lookingFor = lookingFor;
- buffer = new int[lookingFor.length];
- Arrays.fill(buffer, -1);
- }
-
- public byte[] getByteArray() {
- return lookingFor;
- }
-
- /**
- * Returns the oldest byte in the buffer
- *
- * @return
- */
- public int getOldestByte() {
- return buffer[insertionPointer];
- }
-
- public boolean isFilled() {
- return bufferSize >= buffer.length;
- }
-
- public boolean addAndCompare(final byte data) {
- buffer[insertionPointer] = data;
- insertionPointer = (insertionPointer + 1) % lookingFor.length;
-
- bufferSize++;
- if (bufferSize < lookingFor.length) {
- return false;
- }
-
- for (int i = 0; i < lookingFor.length; i++) {
- final byte compare = (byte) buffer[(insertionPointer + i) % lookingFor.length];
- if (compare != lookingFor[i]) {
- return false;
- }
- }
-
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java
deleted file mode 100644
index 92061e0..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-public class BooleanHolder extends ObjectHolder<Boolean> {
-
- public BooleanHolder(final boolean initialValue) {
- super(initialValue);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
deleted file mode 100644
index 805223f..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Locale;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class FormatUtils {
-
- private static final String UNION = "|";
-
- // for Data Sizes
- private static final double BYTES_IN_KILOBYTE = 1024;
- private static final double BYTES_IN_MEGABYTE = BYTES_IN_KILOBYTE * 1024;
- private static final double BYTES_IN_GIGABYTE = BYTES_IN_MEGABYTE * 1024;
- private static final double BYTES_IN_TERABYTE = BYTES_IN_GIGABYTE * 1024;
-
- // for Time Durations
- private static final String NANOS = join(UNION, "ns", "nano", "nanos", "nanoseconds");
- private static final String MILLIS = join(UNION, "ms", "milli", "millis", "milliseconds");
- private static final String SECS = join(UNION, "s", "sec", "secs", "second", "seconds");
- private static final String MINS = join(UNION, "m", "min", "mins", "minute", "minutes");
- private static final String HOURS = join(UNION, "h", "hr", "hrs", "hour", "hours");
- private static final String DAYS = join(UNION, "d", "day", "days");
-
- private static final String VALID_TIME_UNITS = join(UNION, NANOS, MILLIS, SECS, MINS, HOURS, DAYS);
- public static final String TIME_DURATION_REGEX = "(\\d+)\\s*(" + VALID_TIME_UNITS + ")";
- public static final Pattern TIME_DURATION_PATTERN = Pattern.compile(TIME_DURATION_REGEX);
-
- /**
- * Formats the specified count by adding commas.
- *
- * @param count
- * @return
- */
- public static String formatCount(final long count) {
- return NumberFormat.getIntegerInstance().format(count);
- }
-
- /**
- * Formats the specified duration in 'mm:ss.SSS' format.
- *
- * @param sourceDuration
- * @param sourceUnit
- * @return
- */
- public static String formatMinutesSeconds(final long sourceDuration, final TimeUnit sourceUnit) {
- final long millis = TimeUnit.MILLISECONDS.convert(sourceDuration, sourceUnit);
- final SimpleDateFormat formatter = new SimpleDateFormat("mm:ss.SSS", Locale.US);
- return formatter.format(new Date(millis));
- }
-
- /**
- * Formats the specified duration in 'HH:mm:ss.SSS' format.
- *
- * @param sourceDuration
- * @param sourceUnit
- * @return
- */
- public static String formatHoursMinutesSeconds(final long sourceDuration, final TimeUnit sourceUnit) {
- final long millis = TimeUnit.MILLISECONDS.convert(sourceDuration, sourceUnit);
- final long millisInHour = TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
- final int hours = (int) (millis / millisInHour);
- final long whatsLeft = millis - hours * millisInHour;
-
- return pad(hours) + ":" + new SimpleDateFormat("mm:ss.SSS", Locale.US).format(new Date(whatsLeft));
- }
-
- private static String pad(final int val) {
- return (val < 10) ? "0" + val : String.valueOf(val);
- }
-
- /**
- * Formats the specified data size in human readable format.
- *
- * @param dataSize Data size in bytes
- * @return Human readable format
- */
- public static String formatDataSize(final double dataSize) {
- // initialize the formatter
- final NumberFormat format = NumberFormat.getNumberInstance();
- format.setMaximumFractionDigits(2);
-
- // check terabytes
- double dataSizeToFormat = dataSize / BYTES_IN_TERABYTE;
- if (dataSizeToFormat > 1) {
- return format.format(dataSizeToFormat) + " TB";
- }
-
- // check gigabytes
- dataSizeToFormat = dataSize / BYTES_IN_GIGABYTE;
- if (dataSizeToFormat > 1) {
- return format.format(dataSizeToFormat) + " GB";
- }
-
- // check megabytes
- dataSizeToFormat = dataSize / BYTES_IN_MEGABYTE;
- if (dataSizeToFormat > 1) {
- return format.format(dataSizeToFormat) + " MB";
- }
-
- // check kilobytes
- dataSizeToFormat = dataSize / BYTES_IN_KILOBYTE;
- if (dataSizeToFormat > 1) {
- return format.format(dataSizeToFormat) + " KB";
- }
-
- // default to bytes
- return format.format(dataSize) + " bytes";
- }
-
- public static long getTimeDuration(final String value, final TimeUnit desiredUnit) {
- final Matcher matcher = TIME_DURATION_PATTERN.matcher(value.toLowerCase());
- if (!matcher.matches()) {
- throw new IllegalArgumentException("Value '" + value + "' is not a valid Time Duration");
- }
-
- final String duration = matcher.group(1);
- final String units = matcher.group(2);
- TimeUnit specifiedTimeUnit = null;
- switch (units.toLowerCase()) {
- case "ns":
- case "nano":
- case "nanos":
- case "nanoseconds":
- specifiedTimeUnit = TimeUnit.NANOSECONDS;
- break;
- case "ms":
- case "milli":
- case "millis":
- case "milliseconds":
- specifiedTimeUnit = TimeUnit.MILLISECONDS;
- break;
- case "s":
- case "sec":
- case "secs":
- case "second":
- case "seconds":
- specifiedTimeUnit = TimeUnit.SECONDS;
- break;
- case "m":
- case "min":
- case "mins":
- case "minute":
- case "minutes":
- specifiedTimeUnit = TimeUnit.MINUTES;
- break;
- case "h":
- case "hr":
- case "hrs":
- case "hour":
- case "hours":
- specifiedTimeUnit = TimeUnit.HOURS;
- break;
- case "d":
- case "day":
- case "days":
- specifiedTimeUnit = TimeUnit.DAYS;
- break;
- }
-
- final long durationVal = Long.parseLong(duration);
- return desiredUnit.convert(durationVal, specifiedTimeUnit);
- }
-
- public static String formatUtilization(final double utilization) {
- return utilization + "%";
- }
-
- private static String join(final String delimiter, final String... values) {
- if (values.length == 0) {
- return "";
- } else if (values.length == 1) {
- return values[0];
- }
-
- final StringBuilder sb = new StringBuilder();
- sb.append(values[0]);
- for (int i = 1; i < values.length; i++) {
- sb.append(delimiter).append(values[i]);
- }
-
- return sb.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java
deleted file mode 100644
index 213bbc0..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-public class IntegerHolder extends ObjectHolder<Integer> {
-
- public IntegerHolder(final int initialValue) {
- super(initialValue);
- }
-
- public int addAndGet(final int delta) {
- final int curValue = get();
- final int newValue = curValue + delta;
- set(newValue);
- return newValue;
- }
-
- public int getAndAdd(final int delta) {
- final int curValue = get();
- final int newValue = curValue + delta;
- set(newValue);
- return curValue;
- }
-
- public int incrementAndGet() {
- return addAndGet(1);
- }
-
- public int getAndIncrement() {
- return getAndAdd(1);
- }
-
- public int decrementAndGet() {
- return addAndGet(-1);
- }
-
- public int getAndDecrement() {
- return getAndAdd(-1);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java
deleted file mode 100644
index ef70ce8..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-/**
- * Wraps a Long value so that it can be declared <code>final</code> and still be
- * accessed from which inner classes; the functionality is similar to that of an
- * AtomicLong, but operations on this class are not atomic. This results in
- * greater performance when the atomicity is not needed.
- */
-public class LongHolder extends ObjectHolder<Long> {
-
- public LongHolder(final long initialValue) {
- super(initialValue);
- }
-
- public long addAndGet(final long delta) {
- final long curValue = get();
- final long newValue = curValue + delta;
- set(newValue);
- return newValue;
- }
-
- public long getAndAdd(final long delta) {
- final long curValue = get();
- final long newValue = curValue + delta;
- set(newValue);
- return curValue;
- }
-
- public long incrementAndGet() {
- return addAndGet(1);
- }
-
- public long getAndIncrement() {
- return getAndAdd(1);
- }
-
- public long decrementAndGet() {
- return addAndGet(-1L);
- }
-
- public long getAndDecrement() {
- return getAndAdd(-1L);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java
deleted file mode 100644
index 85bfd96..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.util.Arrays;
-
-/**
- * <p>
- * A RingBuffer that can be used to scan byte sequences for subsequences.
- * </p>
- *
- * <p>
- * This class implements an efficient naive search algorithm, which allows the
- * user of the library to identify byte sequences in a stream on-the-fly so that
- * the stream can be segmented without having to buffer the data.
- * </p>
- *
- * <p>
- * The intended usage paradigm is:
- * <code>
- * <pre>
- * final byte[] searchSequence = ...;
- * final CircularBuffer buffer = new CircularBuffer(searchSequence);
- * while ((int nextByte = in.read()) > 0) {
- * if ( buffer.addAndCompare(nextByte) ) {
- * // This byte is the last byte in the given sequence
- * } else {
- * // This byte does not complete the given sequence
- * }
- * }
- * </pre>
- * </code>
- * </p>
- */
-public class NaiveSearchRingBuffer {
-
- private final byte[] lookingFor;
- private final int[] buffer;
- private int insertionPointer = 0;
- private int bufferSize = 0;
-
- public NaiveSearchRingBuffer(final byte[] lookingFor) {
- this.lookingFor = lookingFor;
- this.buffer = new int[lookingFor.length];
- Arrays.fill(buffer, -1);
- }
-
- /**
- * Returns the contents of the internal buffer, which represents the last X
- * bytes added to the buffer, where X is the minimum of the number of bytes
- * added to the buffer or the length of the byte sequence for which we are
- * looking
- *
- * @return
- */
- public byte[] getBufferContents() {
- final int contentLength = Math.min(lookingFor.length, bufferSize);
- final byte[] contents = new byte[contentLength];
- for (int i = 0; i < contentLength; i++) {
- final byte nextByte = (byte) buffer[(insertionPointer + i) % lookingFor.length];
- contents[i] = nextByte;
- }
- return contents;
- }
-
- /**
- * Returns the oldest byte in the buffer
- *
- * @return
- */
- public int getOldestByte() {
- return buffer[insertionPointer];
- }
-
- /**
- * Returns <code>true</code> if the number of bytes that have been added to
- * the buffer is at least equal to the length of the byte sequence for which
- * we are searching
- *
- * @return
- */
- public boolean isFilled() {
- return bufferSize >= buffer.length;
- }
-
- /**
- * Clears the internal buffer so that a new search may begin
- */
- public void clear() {
- Arrays.fill(buffer, -1);
- insertionPointer = 0;
- bufferSize = 0;
- }
-
- /**
- * Add the given byte to the buffer and notify whether or not the byte
- * completes the desired byte sequence.
- *
- * @param data
- * @return <code>true</code> if this byte completes the byte sequence,
- * <code>false</code> otherwise.
- */
- public boolean addAndCompare(final byte data) {
- buffer[insertionPointer] = data;
- insertionPointer = (insertionPointer + 1) % lookingFor.length;
-
- bufferSize++;
- if (bufferSize < lookingFor.length) {
- return false;
- }
-
- for (int i = 0; i < lookingFor.length; i++) {
- final byte compare = (byte) buffer[(insertionPointer + i) % lookingFor.length];
- if (compare != lookingFor[i]) {
- return false;
- }
- }
-
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java
deleted file mode 100644
index a58ec6a..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-/**
- * A bean that holds a single value of type T.
- *
- * @param <T>
- */
-public class ObjectHolder<T> {
-
- private T value;
-
- public ObjectHolder(final T initialValue) {
- this.value = initialValue;
- }
-
- public T get() {
- return value;
- }
-
- public void set(T value) {
- this.value = value;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java
deleted file mode 100644
index c0bb830..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * Thread-safe implementation of a RingBuffer
- *
- * @param <T>
- */
-public class RingBuffer<T> {
-
- private final Object[] buffer;
- private int insertionPointer = 0;
- private boolean filled = false;
-
- private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
- private final Lock readLock = rwLock.readLock();
- private final Lock writeLock = rwLock.writeLock();
-
- public RingBuffer(final int size) {
- buffer = new Object[size];
- }
-
- /**
- * Adds the given value to the RingBuffer and returns the value that was
- * removed in order to make room.
- *
- * @param value
- * @return
- */
- @SuppressWarnings("unchecked")
- public T add(final T value) {
- Objects.requireNonNull(value);
-
- writeLock.lock();
- try {
- final Object removed = buffer[insertionPointer];
-
- buffer[insertionPointer] = value;
-
- if (insertionPointer == buffer.length - 1) {
- filled = true;
- }
-
- insertionPointer = (insertionPointer + 1) % buffer.length;
- return (T) removed;
- } finally {
- writeLock.unlock();
- }
- }
-
- public int getSize() {
- readLock.lock();
- try {
- return filled ? buffer.length : insertionPointer;
- } finally {
- readLock.unlock();
- }
- }
-
- public List<T> getSelectedElements(final Filter<T> filter) {
- return getSelectedElements(filter, Integer.MAX_VALUE);
- }
-
- public List<T> getSelectedElements(final Filter<T> filter, final int maxElements) {
- final List<T> selected = new ArrayList<>(1000);
- int numSelected = 0;
- readLock.lock();
- try {
- for (int i = 0; i < buffer.length && numSelected < maxElements; i++) {
- final int idx = (insertionPointer + i) % buffer.length;
- final Object val = buffer[idx];
- if (val == null) {
- continue;
- }
-
- @SuppressWarnings("unchecked")
- final T element = (T) val;
- if (filter.select(element)) {
- selected.add(element);
- numSelected++;
- }
- }
- } finally {
- readLock.unlock();
- }
- return selected;
- }
-
- public int countSelectedElements(final Filter<T> filter) {
- int numSelected = 0;
- readLock.lock();
- try {
- for (int i = 0; i < buffer.length; i++) {
- final int idx = (insertionPointer + i) % buffer.length;
- final Object val = buffer[idx];
- if (val == null) {
- continue;
- }
-
- @SuppressWarnings("unchecked")
- final T element = (T) val;
- if (filter.select(element)) {
- numSelected++;
- }
- }
- } finally {
- readLock.unlock();
- }
-
- return numSelected;
- }
-
- /**
- * Removes all elements from the RingBuffer that match the given filter
- *
- * @param filter
- * @return
- */
- public int removeSelectedElements(final Filter<T> filter) {
- int count = 0;
-
- writeLock.lock();
- try {
- for (int i = 0; i < buffer.length; i++) {
- final int idx = (insertionPointer + i + 1) % buffer.length;
- final Object val = buffer[idx];
- if (val == null) {
- continue;
- }
-
- @SuppressWarnings("unchecked")
- final T element = (T) val;
-
- if (filter.select(element)) {
- buffer[idx] = null;
- }
- }
- } finally {
- writeLock.unlock();
- }
-
- return count;
- }
-
- public List<T> asList() {
- return getSelectedElements(new Filter<T>() {
- @Override
- public boolean select(final T value) {
- return true;
- }
- });
- }
-
- public T getOldestElement() {
- readLock.lock();
- try {
- return getElementData(insertionPointer);
- } finally {
- readLock.unlock();
- }
- }
-
- public T getNewestElement() {
- readLock.lock();
- try {
- int index = (insertionPointer == 0) ? buffer.length : insertionPointer - 1;
- return getElementData(index);
- } finally {
- readLock.unlock();
- }
- }
-
- @SuppressWarnings("unchecked")
- private T getElementData(final int index) {
- readLock.lock();
- try {
- return (T) buffer[index];
- } finally {
- readLock.unlock();
- }
- }
-
- /**
- * Iterates over each element in the RingBuffer, calling the
- * {@link ForEachEvaluator#evaluate(Object) evaluate} method on each element
- * in the RingBuffer. If the Evaluator returns {@code false}, the method
- * will skip all remaining elements in the RingBuffer; otherwise, the next
- * element will be evaluated until all elements have been evaluated.
- *
- * @param evaluator
- */
- public void forEach(final ForEachEvaluator<T> evaluator) {
- forEach(evaluator, IterationDirection.FORWARD);
- }
-
- /**
- * Iterates over each element in the RingBuffer, calling the
- * {@link ForEachEvaluator#evaluate(Object) evaluate} method on each element
- * in the RingBuffer. If the Evaluator returns {@code false}, the method
- * will skip all remaining elements in the RingBuffer; otherwise, the next
- * element will be evaluated until all elements have been evaluated.
- *
- * @param evaluator
- * @param iterationDirection the order in which to iterate over the elements
- * in the RingBuffer
- */
- public void forEach(final ForEachEvaluator<T> evaluator, final IterationDirection iterationDirection) {
- readLock.lock();
- try {
- final int startIndex;
- final int endIndex;
- final int increment;
-
- if (iterationDirection == IterationDirection.FORWARD) {
- startIndex = 0;
- endIndex = buffer.length - 1;
- increment = 1;
- } else {
- startIndex = buffer.length - 1;
- endIndex = 0;
- increment = -1;
- }
-
- for (int i = startIndex; (iterationDirection == IterationDirection.FORWARD ? i <= endIndex : i >= endIndex); i += increment) {
- final int idx = (insertionPointer + i) % buffer.length;
- final Object val = buffer[idx];
- if (val == null) {
- continue;
- }
-
- @SuppressWarnings("unchecked")
- final T element = (T) val;
- if (!evaluator.evaluate(element)) {
- return;
- }
- }
- } finally {
- readLock.unlock();
- }
- }
-
- public static interface Filter<S> {
-
- boolean select(S value);
- }
-
- /**
- * Defines an interface that can be used to iterate over all of the elements
- * in the RingBuffer via the {@link #forEach} method
- *
- * @param <S>
- */
- public static interface ForEachEvaluator<S> {
-
- /**
- * Evaluates the given element and returns {@code true} if the next
- * element should be evaluated, {@code false} otherwise
- *
- * @param value
- * @return
- */
- boolean evaluate(S value);
- }
-
- public static enum IterationDirection {
-
- FORWARD,
- BACKWARD;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java
deleted file mode 100644
index cd11930..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.util.concurrent.TimeUnit;
-
-public final class StopWatch {
-
- private long startNanos = -1L;
- private long duration = -1L;
-
- /**
- * Creates a StopWatch but does not start it
- */
- public StopWatch() {
- this(false);
- }
-
- /**
- * @param autoStart whether or not the timer should be started automatically
- */
- public StopWatch(final boolean autoStart) {
- if (autoStart) {
- start();
- }
- }
-
- public void start() {
- this.startNanos = System.nanoTime();
- this.duration = -1L;
- }
-
- public void stop() {
- if (startNanos < 0) {
- throw new IllegalStateException("StopWatch has not been started");
- }
- this.duration = System.nanoTime() - startNanos;
- this.startNanos = -1L;
- }
-
- /**
- * Returns the amount of time that the StopWatch was running.
- *
- * @param timeUnit
- * @return
- *
- * @throws IllegalStateException if the StopWatch has not been stopped via
- * {@link #stop()}
- */
- public long getDuration(final TimeUnit timeUnit) {
- if (duration < 0) {
- throw new IllegalStateException("Cannot get duration until StopWatch has been stopped");
- }
- return timeUnit.convert(duration, TimeUnit.NANOSECONDS);
- }
-
- /**
- * Returns the amount of time that has elapsed since the timer was started.
- *
- * @param timeUnit
- * @return
- */
- public long getElapsed(final TimeUnit timeUnit) {
- return timeUnit.convert(System.nanoTime() - startNanos, TimeUnit.NANOSECONDS);
- }
-
- public String calculateDataRate(final long bytes) {
- final double seconds = (double) duration / 1000000000.0D;
- final long dataSize = (long) (bytes / seconds);
- return FormatUtils.formatDataSize(dataSize) + "/sec";
- }
-
- public String getDuration() {
- final StringBuilder sb = new StringBuilder();
-
- long duration = this.duration;
- final long minutes = (duration > 60000000000L) ? (duration / 60000000000L) : 0L;
- duration -= TimeUnit.NANOSECONDS.convert(minutes, TimeUnit.MINUTES);
-
- final long seconds = (duration > 1000000000L) ? (duration / 1000000000L) : 0L;
- duration -= TimeUnit.NANOSECONDS.convert(seconds, TimeUnit.SECONDS);
-
- final long millis = (duration > 1000000L) ? (duration / 1000000L) : 0L;
- duration -= TimeUnit.NANOSECONDS.convert(millis, TimeUnit.MILLISECONDS);
-
- final long nanos = duration % 1000000L;
-
- if (minutes > 0) {
- sb.append(minutes).append(" minutes");
- }
-
- if (seconds > 0) {
- if (minutes > 0) {
- sb.append(", ");
- }
-
- sb.append(seconds).append(" seconds");
- }
-
- if (millis > 0) {
- if (seconds > 0) {
- sb.append(", ");
- }
-
- sb.append(millis).append(" millis");
- }
- if (seconds == 0 && millis == 0) {
- sb.append(nanos).append(" nanos");
- }
-
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java
deleted file mode 100644
index 63736ed..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-/**
- *
- * @author unattrib
- * @param <A>
- * @param <B>
- */
-public class Tuple<A, B> {
-
- final A key;
- final B value;
-
- public Tuple(A key, B value) {
- this.key = key;
- this.value = value;
- }
-
- public A getKey() {
- return key;
- }
-
- public B getValue() {
- return value;
- }
-
- @Override
- public boolean equals(final Object other) {
- if (other == null) {
- return false;
- }
- if (other == this) {
- return true;
- }
- if (!(other instanceof Tuple)) {
- return false;
- }
-
- final Tuple<?, ?> tuple = (Tuple<?, ?>) other;
- if (key == null) {
- if (tuple.key != null) {
- return false;
- }
- } else {
- if (!key.equals(tuple.key)) {
- return false;
- }
- }
-
- if (value == null) {
- if (tuple.value != null) {
- return false;
- }
- } else {
- if (!value.equals(tuple.value)) {
- return false;
- }
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return 581 + (this.key == null ? 0 : this.key.hashCode()) + (this.value == null ? 0 : this.value.hashCode());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java
deleted file mode 100644
index a8d7e82..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.concurrency;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-
-public class DebugDisabledTimedLock implements DebuggableTimedLock {
-
- private final Lock lock;
-
- public DebugDisabledTimedLock(final Lock lock) {
- this.lock = lock;
- }
-
- /**
- *
- * @return
- */
- @Override
- public boolean tryLock() {
- return lock.tryLock();
- }
-
- /**
- *
- * @param timeout
- * @param timeUnit
- * @return
- */
- @Override
- public boolean tryLock(final long timeout, final TimeUnit timeUnit) {
- try {
- return lock.tryLock(timeout, timeUnit);
- } catch (InterruptedException e) {
- return false;
- }
- }
-
- /**
- *
- */
- @Override
- public void lock() {
- lock.lock();
- }
-
- @Override
- public void unlock(final String task) {
- lock.unlock();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java
deleted file mode 100644
index f082168..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.concurrency;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DebugEnabledTimedLock implements DebuggableTimedLock {
-
- private final Lock lock;
- private final Logger logger;
- private long lockTime = 0L;
-
- private final Map<String, Long> lockIterations = new HashMap<>();
- private final Map<String, Long> lockNanos = new HashMap<>();
-
- private final String name;
- private final int iterationFrequency;
-
- public DebugEnabledTimedLock(final Lock lock, final String name, final int iterationFrequency) {
- this.lock = lock;
- this.name = name;
- this.iterationFrequency = iterationFrequency;
- logger = LoggerFactory.getLogger(TimedLock.class.getName() + "." + name);
- }
-
- /**
- *
- * @return
- */
- @Override
- public boolean tryLock() {
- logger.trace("Trying to obtain Lock: {}", name);
- final boolean success = lock.tryLock();
- if (!success) {
- logger.trace("TryLock failed for Lock: {}", name);
- return false;
- }
- logger.trace("TryLock successful");
-
- return true;
- }
-
- /**
- *
- * @param timeout
- * @param timeUnit
- * @return
- */
- @Override
- public boolean tryLock(final long timeout, final TimeUnit timeUnit) {
- logger.trace("Trying to obtain Lock {} with a timeout of {} {}", name, timeout, timeUnit);
- final boolean success;
- try {
- success = lock.tryLock(timeout, timeUnit);
- } catch (final InterruptedException ie) {
- return false;
- }
-
- if (!success) {
- logger.trace("TryLock failed for Lock {} with a timeout of {} {}", name, timeout, timeUnit);
- return false;
- }
- logger.trace("TryLock successful");
- return true;
- }
-
- /**
- *
- */
- @Override
- public void lock() {
- logger.trace("Obtaining Lock {}", name);
- lock.lock();
- lockTime = System.nanoTime();
- logger.trace("Obtained Lock {}", name);
- }
-
- /**
- *
- * @param task
- */
- @Override
- public void unlock(final String task) {
- if (lockTime <= 0L) {
- lock.unlock();
- return;
- }
-
- logger.trace("Releasing Lock {}", name);
- final long nanosLocked = System.nanoTime() - lockTime;
-
- Long startIterations = lockIterations.get(task);
- if (startIterations == null) {
- startIterations = 0L;
- }
- final long iterations = startIterations + 1L;
- lockIterations.put(task, iterations);
-
- Long startNanos = lockNanos.get(task);
- if (startNanos == null) {
- startNanos = 0L;
- }
- final long totalNanos = startNanos + nanosLocked;
- lockNanos.put(task, totalNanos);
-
- lockTime = -1L;
-
- lock.unlock();
- logger.trace("Released Lock {}", name);
-
- if (iterations % iterationFrequency == 0) {
- logger.debug("Lock {} held for {} nanos for task: {}; total lock iterations: {}; total lock nanos: {}", name, nanosLocked, task, iterations, totalNanos);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java
deleted file mode 100644
index 69da6e8..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.concurrency;
-
-import java.util.concurrent.TimeUnit;
-
-public interface DebuggableTimedLock {
-
- void lock();
-
- boolean tryLock(long timePeriod, TimeUnit timeUnit);
-
- boolean tryLock();
-
- void unlock(String task);
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java
deleted file mode 100644
index 532d3c3..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.concurrency;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TimedLock {
-
- private final DebugEnabledTimedLock enabled;
- private final DebugDisabledTimedLock disabled;
-
- private final Logger logger;
-
- public TimedLock(final Lock lock, final String name, final int iterationFrequency) {
- this.enabled = new DebugEnabledTimedLock(lock, name, iterationFrequency);
- this.disabled = new DebugDisabledTimedLock(lock);
-
- logger = LoggerFactory.getLogger(TimedLock.class.getName() + "." + name);
- }
-
- private DebuggableTimedLock getLock() {
- return logger.isDebugEnabled() ? enabled : disabled;
- }
-
- public boolean tryLock() {
- return getLock().tryLock();
- }
-
- public boolean tryLock(final long timeout, final TimeUnit timeUnit) {
- return getLock().tryLock(timeout, timeUnit);
- }
-
- public void lock() {
- getLock().lock();
- }
-
- public void unlock(final String task) {
- getLock().unlock(task);
- }
-
-}