You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/11/18 19:54:06 UTC
[6/7] nifi git commit: NIFI-2854: Refactor repositories and swap
files to use schema-based serialization so that nifi can be rolled back to a
previous version after an upgrade.
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
index 2afaa70..324f59f 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
@@ -16,19 +16,445 @@
*/
package org.apache.nifi.stream.io;
+import java.io.IOException;
import java.io.InputStream;
/**
* This class is a slight modification of the BufferedInputStream in the java.io package. The modification is that this implementation does not provide synchronization on method calls, which means
* that this class is not suitable for use by multiple threads. However, the absence of these synchronized blocks results in potentially much better performance.
*/
-public class BufferedInputStream extends java.io.BufferedInputStream {
+public class BufferedInputStream extends InputStream {
- public BufferedInputStream(final InputStream in) {
- super(in);
+ private final InputStream in;
+
+ private static int DEFAULT_BUFFER_SIZE = 8192;
+
+ /**
+ * The maximum size of array to allocate.
+ * Some VMs reserve some header words in an array.
+ * Attempts to allocate larger arrays may result in
+ * OutOfMemoryError: Requested array size exceeds VM limit
+ */
+ private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
+
+ /**
+ * The internal buffer array where the data is stored. When necessary,
+ * it may be replaced by another array of
+ * a different size.
+ */
+ protected byte buf[];
+
+ /**
+ * The index one greater than the index of the last valid byte in
+ * the buffer.
+ * This value is always
+ * in the range <code>0</code> through <code>buf.length</code>;
+ * elements <code>buf[0]</code> through <code>buf[count-1]
+ * </code>contain buffered input data obtained
+ * from the underlying input stream.
+ */
+ private int count;
+
+ /**
+ * The current position in the buffer. This is the index of the next
+ * character to be read from the <code>buf</code> array.
+ * <p>
+ * This value is always in the range <code>0</code>
+ * through <code>count</code>. If it is less
+ * than <code>count</code>, then <code>buf[pos]</code>
+ * is the next byte to be supplied as input;
+ * if it is equal to <code>count</code>, then
+ * the next <code>read</code> or <code>skip</code>
+ * operation will require more bytes to be
+ * read from the contained input stream.
+ *
+ * @see java.io.BufferedInputStream#buf
+ */
+ private int pos;
+
+ /**
+ * The value of the <code>pos</code> field at the time the last
+ * <code>mark</code> method was called.
+ * <p>
+ * This value is always
+ * in the range <code>-1</code> through <code>pos</code>.
+ * If there is no marked position in the input
+ * stream, this field is <code>-1</code>. If
+ * there is a marked position in the input
+ * stream, then <code>buf[markpos]</code>
+ * is the first byte to be supplied as input
+ * after a <code>reset</code> operation. If
+ * <code>markpos</code> is not <code>-1</code>,
+ * then all bytes from positions <code>buf[markpos]</code>
+ * through <code>buf[pos-1]</code> must remain
+ * in the buffer array (though they may be
+ * moved to another place in the buffer array,
+ * with suitable adjustments to the values
+ * of <code>count</code>, <code>pos</code>,
+ * and <code>markpos</code>); they may not
+ * be discarded unless and until the difference
+ * between <code>pos</code> and <code>markpos</code>
+ * exceeds <code>marklimit</code>.
+ *
+ * @see java.io.BufferedInputStream#mark(int)
+ * @see java.io.BufferedInputStream#pos
+ */
+ protected int markpos = -1;
+
+ /**
+ * The maximum read ahead allowed after a call to the
+ * <code>mark</code> method before subsequent calls to the
+ * <code>reset</code> method fail.
+ * Whenever the difference between <code>pos</code>
+ * and <code>markpos</code> exceeds <code>marklimit</code>,
+ * then the mark may be dropped by setting
+ * <code>markpos</code> to <code>-1</code>.
+ *
+ * @see java.io.BufferedInputStream#mark(int)
+ * @see java.io.BufferedInputStream#reset()
+ */
+ protected int marklimit;
+
+ /**
+ * Check to make sure that underlying input stream has not been
+ * nulled out due to close; if not return it;
+ */
+ private InputStream getInIfOpen() throws IOException {
+ InputStream input = in;
+ if (input == null) {
+ throw new IOException("Stream closed");
+ }
+ return input;
+ }
+
+ /**
+ * Check to make sure that buffer has not been nulled out due to
+ * close; if not return it;
+ */
+ private byte[] getBufIfOpen() throws IOException {
+ if (buf == null) {
+ throw new IOException("Stream closed");
+ }
+ return buf;
+ }
+
+ /**
+ * Creates a <code>BufferedInputStream</code>
+ * and saves its argument, the input stream
+ * <code>in</code>, for later use. An internal
+ * buffer array is created and stored in <code>buf</code>.
+ *
+ * @param in the underlying input stream.
+ */
+ public BufferedInputStream(InputStream in) {
+ this(in, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Creates a <code>BufferedInputStream</code>
+ * with the specified buffer size,
+ * and saves its argument, the input stream
+ * <code>in</code>, for later use. An internal
+ * buffer array of length <code>size</code>
+ * is created and stored in <code>buf</code>.
+ *
+ * @param in the underlying input stream.
+ * @param size the buffer size.
+ * @exception IllegalArgumentException if {@code size <= 0}.
+ */
+ public BufferedInputStream(InputStream in, int size) {
+ this.in = in;
+ if (size <= 0) {
+ throw new IllegalArgumentException("Buffer size <= 0");
+ }
+ buf = new byte[size];
+ }
+
+ /**
+ * Fills the buffer with more data, taking into account
+ * shuffling and other tricks for dealing with marks.
+ * Assumes that it is being called by a synchronized method.
+ * This method also assumes that all data has already been read in,
+ * hence pos > count.
+ */
+ private void fill() throws IOException {
+ byte[] buffer = getBufIfOpen();
+ if (markpos < 0) {
+ pos = 0; /* no mark: throw away the buffer */
+ } else if (pos >= buffer.length) {
+ if (markpos > 0) { /* can throw away early part of the buffer */
+ int sz = pos - markpos;
+ System.arraycopy(buffer, markpos, buffer, 0, sz);
+ pos = sz;
+ markpos = 0;
+ } else if (buffer.length >= marklimit) {
+ markpos = -1; /* buffer got too big, invalidate mark */
+ pos = 0; /* drop buffer contents */
+ } else if (buffer.length >= MAX_BUFFER_SIZE) {
+ throw new OutOfMemoryError("Required array size too large");
+ } else { /* grow buffer */
+ int nsz = (pos <= MAX_BUFFER_SIZE - pos) ? pos * 2 : MAX_BUFFER_SIZE;
+ if (nsz > marklimit) {
+ nsz = marklimit;
+ }
+ byte nbuf[] = new byte[nsz];
+ System.arraycopy(buffer, 0, nbuf, 0, pos);
+ buffer = nbuf;
+ }
+ }
+ count = pos;
+ int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
+ if (n > 0) {
+ count = n + pos;
+ }
+ }
+
+ /**
+ * See
+ * the general contract of the <code>read</code>
+ * method of <code>InputStream</code>.
+ *
+ * @return the next byte of data, or <code>-1</code> if the end of the
+ * stream is reached.
+ * @exception IOException if this input stream has been closed by
+ * invoking its {@link #close()} method,
+ * or an I/O error occurs.
+ * @see java.io.FilterInputStream#in
+ */
+ @Override
+ public int read() throws IOException {
+ if (pos >= count) {
+ fill();
+ if (pos >= count) {
+ return -1;
+ }
+ }
+ return getBufIfOpen()[pos++] & 0xff;
+ }
+
+ /**
+ * Read characters into a portion of an array, reading from the underlying
+ * stream at most once if necessary.
+ */
+ private int read1(byte[] b, int off, int len) throws IOException {
+ int avail = count - pos;
+ if (avail <= 0) {
+ /*
+ * If the requested length is at least as large as the buffer, and
+ * if there is no mark/reset activity, do not bother to copy the
+ * bytes into the local buffer. In this way buffered streams will
+ * cascade harmlessly.
+ */
+ if (len >= getBufIfOpen().length && markpos < 0) {
+ return getInIfOpen().read(b, off, len);
+ }
+ fill();
+ avail = count - pos;
+ if (avail <= 0) {
+ return -1;
+ }
+ }
+ int cnt = (avail < len) ? avail : len;
+ System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
+ pos += cnt;
+ return cnt;
+ }
+
+ /**
+ * Reads bytes from this byte-input stream into the specified byte array,
+ * starting at the given offset.
+ *
+ * <p>
+ * This method implements the general contract of the corresponding
+ * <code>{@link InputStream#read(byte[], int, int) read}</code> method of
+ * the <code>{@link InputStream}</code> class. As an additional
+ * convenience, it attempts to read as many bytes as possible by repeatedly
+ * invoking the <code>read</code> method of the underlying stream. This
+ * iterated <code>read</code> continues until one of the following
+ * conditions becomes true:
+ * <ul>
+ *
+ * <li>The specified number of bytes have been read,
+ *
+ * <li>The <code>read</code> method of the underlying stream returns
+ * <code>-1</code>, indicating end-of-file, or
+ *
+ * <li>The <code>available</code> method of the underlying stream
+ * returns zero, indicating that further input requests would block.
+ *
+ * </ul>
+ * If the first <code>read</code> on the underlying stream returns
+ * <code>-1</code> to indicate end-of-file then this method returns
+ * <code>-1</code>. Otherwise this method returns the number of bytes
+ * actually read.
+ *
+ * <p>
+ * Subclasses of this class are encouraged, but not required, to
+ * attempt to read as many bytes as possible in the same fashion.
+ *
+ * @param b destination buffer.
+ * @param off offset at which to start storing bytes.
+ * @param len maximum number of bytes to read.
+ * @return the number of bytes read, or <code>-1</code> if the end of
+ * the stream has been reached.
+ * @exception IOException if this input stream has been closed by
+ * invoking its {@link #close()} method,
+ * or an I/O error occurs.
+ */
+ @Override
+ public int read(byte b[], int off, int len)
+ throws IOException {
+ getBufIfOpen(); // Check for closed stream
+ if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return 0;
+ }
+
+ int n = 0;
+ for (;;) {
+ int nread = read1(b, off + n, len - n);
+ if (nread <= 0) {
+ return (n == 0) ? nread : n;
+ }
+ n += nread;
+ if (n >= len) {
+ return n;
+ }
+ // if not closed but no bytes available, return
+ InputStream input = in;
+ if (input != null && input.available() <= 0) {
+ return n;
+ }
+ }
+ }
+
+ /**
+ * See the general contract of the <code>skip</code>
+ * method of <code>InputStream</code>.
+ *
+ * @exception IOException if the stream does not support seek,
+ * or if this input stream has been closed by
+ * invoking its {@link #close()} method, or an
+ * I/O error occurs.
+ */
+ @Override
+ public long skip(long n) throws IOException {
+ getBufIfOpen(); // Check for closed stream
+ if (n <= 0) {
+ return 0;
+ }
+ long avail = count - pos;
+
+ if (avail <= 0) {
+ // If no mark position set then don't keep in buffer
+ if (markpos < 0) {
+ return getInIfOpen().skip(n);
+ }
+
+ // Fill in buffer to save bytes for reset
+ fill();
+ avail = count - pos;
+ if (avail <= 0) {
+ return 0;
+ }
+ }
+
+ long skipped = (avail < n) ? avail : n;
+ pos += skipped;
+ return skipped;
+ }
+
+ /**
+ * Returns an estimate of the number of bytes that can be read (or
+ * skipped over) from this input stream without blocking by the next
+ * invocation of a method for this input stream. The next invocation might be
+ * the same thread or another thread. A single read or skip of this
+ * many bytes will not block, but may read or skip fewer bytes.
+ * <p>
+ * This method returns the sum of the number of bytes remaining to be read in
+ * the buffer (<code>count - pos</code>) and the result of calling the
+ * {@link java.io.FilterInputStream#in in}.available().
+ *
+ * @return an estimate of the number of bytes that can be read (or skipped
+ * over) from this input stream without blocking.
+ * @exception IOException if this input stream has been closed by
+ * invoking its {@link #close()} method,
+ * or an I/O error occurs.
+ */
+ @Override
+ public int available() throws IOException {
+ int n = count - pos;
+ int avail = getInIfOpen().available();
+ return n > (Integer.MAX_VALUE - avail) ? Integer.MAX_VALUE : n + avail;
+ }
+
+ /**
+ * See the general contract of the <code>mark</code>
+ * method of <code>InputStream</code>.
+ *
+ * @param readlimit the maximum limit of bytes that can be read before
+ * the mark position becomes invalid.
+ * @see java.io.BufferedInputStream#reset()
+ */
+ @Override
+ public void mark(int readlimit) {
+ marklimit = readlimit;
+ markpos = pos;
+ }
+
+ /**
+ * See the general contract of the <code>reset</code>
+ * method of <code>InputStream</code>.
+ * <p>
+ * If <code>markpos</code> is <code>-1</code>
+ * (no mark has been set or the mark has been
+ * invalidated), an <code>IOException</code>
+ * is thrown. Otherwise, <code>pos</code> is
+ * set equal to <code>markpos</code>.
+ *
+ * @exception IOException if this stream has not been marked or,
+ * if the mark has been invalidated, or the stream
+ * has been closed by invoking its {@link #close()}
+ * method, or an I/O error occurs.
+ * @see java.io.BufferedInputStream#mark(int)
+ */
+ @Override
+ public void reset() throws IOException {
+ getBufIfOpen(); // Cause exception if closed
+ if (markpos < 0) {
+ throw new IOException("Resetting to invalid mark");
+ }
+ pos = markpos;
+ }
+
+ /**
+ * Tests if this input stream supports the <code>mark</code>
+ * and <code>reset</code> methods. The <code>markSupported</code>
+ * method of <code>BufferedInputStream</code> returns
+ * <code>true</code>.
+ *
+ * @return a <code>boolean</code> indicating if this stream type supports
+ * the <code>mark</code> and <code>reset</code> methods.
+ * @see java.io.InputStream#mark(int)
+ * @see java.io.InputStream#reset()
+ */
+ @Override
+ public boolean markSupported() {
+ return true;
}
- public BufferedInputStream(final InputStream in, final int size) {
- super(in, size);
+ /**
+ * Closes this input stream and releases any system resources
+ * associated with the stream.
+ * Once the stream has been closed, further read(), available(), reset(),
+ * or skip() invocations will throw an IOException.
+ * Closing a previously closed stream has no effect.
+ *
+ * @exception IOException if an I/O error occurs.
+ */
+ @Override
+ public void close() throws IOException {
+ this.in.close();
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
index a30c14e..cb9ab36 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
@@ -210,4 +210,42 @@ public class FormatUtils {
return sb.toString();
}
+ /**
+ * Formats nanoseconds in the format:
+ * 3 seconds, 8 millis, 3 nanos - if includeTotalNanos = false,
+ * 3 seconds, 8 millis, 3 nanos (3008000003 nanos) - if includeTotalNanos = true
+ *
+ * @param nanos the number of nanoseconds to format
+ * @param includeTotalNanos whether or not to include the total number of nanoseconds in parentheses in the returned value
+ * @return a human-readable String that is a formatted representation of the given number of nanoseconds.
+ */
+ public static String formatNanos(final long nanos, final boolean includeTotalNanos) {
+ final StringBuilder sb = new StringBuilder();
+
+ final long seconds = nanos > 1000000000L ? nanos / 1000000000L : 0L;
+ long millis = nanos > 1000000L ? nanos / 1000000L : 0L;
+ final long nanosLeft = nanos % 1000000L;
+
+ if (seconds > 0) {
+ sb.append(seconds).append(" seconds");
+ }
+ if (millis > 0) {
+ if (seconds > 0) {
+ sb.append(", ");
+ millis -= seconds * 1000L;
+ }
+
+ sb.append(millis).append(" millis");
+ }
+ if (seconds > 0 || millis > 0) {
+ sb.append(", ");
+ }
+ sb.append(nanosLeft).append(" nanos");
+
+ if (includeTotalNanos) {
+ sb.append(" (").append(nanos).append(" nanos)");
+ }
+
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/CountSizeEntityAccess.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/CountSizeEntityAccess.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/CountSizeEntityAccess.java
new file mode 100644
index 0000000..ddbf29b
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/CountSizeEntityAccess.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.util.timebuffer;
+
+public class CountSizeEntityAccess implements EntityAccess<TimedCountSize> {
+ @Override
+ public TimedCountSize aggregate(final TimedCountSize oldValue, final TimedCountSize toAdd) {
+ if (oldValue == null && toAdd == null) {
+ return new TimedCountSize(0L, 0L);
+ } else if (oldValue == null) {
+ return toAdd;
+ } else if (toAdd == null) {
+ return oldValue;
+ }
+
+ return new TimedCountSize(oldValue.getCount() + toAdd.getCount(), oldValue.getSize() + toAdd.getSize());
+ }
+
+ @Override
+ public TimedCountSize createNew() {
+ return new TimedCountSize(0L, 0L);
+ }
+
+ @Override
+ public long getTimestamp(final TimedCountSize entity) {
+ return entity == null ? 0L : entity.getTimestamp();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedCountSize.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedCountSize.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedCountSize.java
new file mode 100644
index 0000000..f1df707
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedCountSize.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.util.timebuffer;
+
+public class TimedCountSize {
+ private final long count;
+ private final long size;
+ private final long timestamp = System.currentTimeMillis();
+
+ public TimedCountSize(final long count, final long size) {
+ this.count = count;
+ this.size = size;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
index f20f917..12e2b10 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
@@ -80,7 +80,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
private final Path partialPath;
private final Path snapshotPath;
- private final SerDe<T> serde;
+ private final SerDeFactory<T> serdeFactory;
private final SyncListener syncListener;
private final FileChannel lockChannel;
private final AtomicLong transactionIdGenerator = new AtomicLong(0L);
@@ -105,7 +105,15 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
private volatile boolean recovered = false;
public MinimalLockingWriteAheadLog(final Path path, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException {
- this(new TreeSet<>(Collections.singleton(path)), partitionCount, serde, syncListener);
+ this(new TreeSet<>(Collections.singleton(path)), partitionCount, new SingletonSerDeFactory<T>(serde), syncListener);
+ }
+
+ public MinimalLockingWriteAheadLog(final Path path, final int partitionCount, final SerDeFactory<T> serdeFactory, final SyncListener syncListener) throws IOException {
+ this(new TreeSet<>(Collections.singleton(path)), partitionCount, serdeFactory, syncListener);
+ }
+
+ public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException {
+ this(paths, partitionCount, new SingletonSerDeFactory<T>(serde), syncListener);
}
/**
@@ -116,16 +124,16 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
* @param partitionCount the number of partitions/journals to use. For best
* performance, this should be close to the number of threads that are
* expected to update the repository simultaneously
- * @param serde the serializer/deserializer for records
+ * @param serdeFactory the factory for the serializer/deserializer for records
* @param syncListener the listener
* @throws IOException if unable to initialize due to IO issue
*/
@SuppressWarnings("unchecked")
- public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException {
+ public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int partitionCount, final SerDeFactory<T> serdeFactory, final SyncListener syncListener) throws IOException {
this.syncListener = syncListener;
requireNonNull(paths);
- requireNonNull(serde);
+ requireNonNull(serdeFactory);
if (paths.isEmpty()) {
throw new IllegalArgumentException("Paths must be non-empty");
@@ -172,7 +180,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
this.basePath = paths.iterator().next();
this.partialPath = basePath.resolve("snapshot.partial");
this.snapshotPath = basePath.resolve("snapshot");
- this.serde = serde;
+ this.serdeFactory = serdeFactory;
final Path lockPath = basePath.resolve("wali.lock");
lockChannel = new FileOutputStream(lockPath.toFile()).getChannel();
@@ -189,7 +197,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
final Path partitionBasePath = pathIterator.next();
- partitions[i] = new Partition<>(partitionBasePath.resolve("partition-" + i), serde, i, getVersion());
+ partitions[i] = new Partition<>(partitionBasePath.resolve("partition-" + i), serdeFactory, i, getVersion());
}
}
@@ -242,13 +250,13 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
}
for (final T record : records) {
- final UpdateType updateType = serde.getUpdateType(record);
- final Object recordIdentifier = serde.getRecordIdentifier(record);
+ final UpdateType updateType = serdeFactory.getUpdateType(record);
+ final Object recordIdentifier = serdeFactory.getRecordIdentifier(record);
if (updateType == UpdateType.DELETE) {
recordMap.remove(recordIdentifier);
} else if (updateType == UpdateType.SWAP_OUT) {
- final String newLocation = serde.getLocation(record);
+ final String newLocation = serdeFactory.getLocation(record);
if (newLocation == null) {
logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_OUT but "
+ "no indicator of where the Record is to be Swapped Out to; these records may be "
@@ -258,7 +266,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
this.externalLocations.add(newLocation);
}
} else if (updateType == UpdateType.SWAP_IN) {
- final String newLocation = serde.getLocation(record);
+ final String newLocation = serdeFactory.getLocation(record);
if (newLocation == null) {
logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_IN but no "
+ "indicator of where the Record is to be Swapped In from; these records may be duplicated "
@@ -360,11 +368,14 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
+ waliImplementationVersion + " of the " + waliImplementationClass + " class; cannot restore using Version " + getVersion());
}
- dataIn.readUTF(); // ignore serde class name for now
+ final String serdeEncoding = dataIn.readUTF(); // ignore serde class name for now
final int serdeVersion = dataIn.readInt();
final long maxTransactionId = dataIn.readLong();
final int numRecords = dataIn.readInt();
+ final SerDe<T> serde = serdeFactory.createSerDe(serdeEncoding);
+ serde.readHeader(dataIn);
+
for (int i = 0; i < numRecords; i++) {
final T record = serde.deserializeRecord(dataIn, serdeVersion);
if (record == null) {
@@ -491,6 +502,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
long stopTheWorldNanos = -1L;
long stopTheWorldStart = -1L;
try {
+ final List<OutputStream> partitionStreams = new ArrayList<>();
+
writeLock.lock();
try {
stopTheWorldStart = System.nanoTime();
@@ -512,25 +525,48 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
swapLocations = new HashSet<>(externalLocations);
for (final Partition<T> partition : partitions) {
try {
- partition.rollover();
+ partitionStreams.add(partition.rollover());
} catch (final Throwable t) {
partition.blackList();
numberBlackListedPartitions.getAndIncrement();
throw t;
}
}
-
- // notify global sync with the write lock held. We do this because we don't want the repository to get updated
- // while the listener is performing its necessary tasks
- if (syncListener != null) {
- syncListener.onGlobalSync();
- }
} finally {
writeLock.unlock();
}
stopTheWorldNanos = System.nanoTime() - stopTheWorldStart;
+ // Close all of the Partitions' Output Streams. We do this here, instead of in Partition.rollover()
+ // because we want to do this outside of the write lock. Because calling close() on FileOutputStream can
+ // be very expensive, as it has to flush the data to disk, we don't want to prevent other Process Sessions
+ // from getting committed. Since rollover() transitions the partition to write to a new file already, there
+ // is no reason that we need to close this FileOutputStream before releasing the write lock. Also, if any Exception
+ // does get thrown when calling close(), we don't need to blacklist the partition, as the stream that was getting
+ // closed is not the stream being written to for the partition anyway. We also catch any IOException and wait until
+ // after we've attempted to close all streams before we throw an Exception, to avoid resource leaks if one of them
+ // is unable to be closed (due to out of storage space, for instance).
+ IOException failure = null;
+ for (final OutputStream partitionStream : partitionStreams) {
+ try {
+ partitionStream.close();
+ } catch (final IOException e) {
+ failure = e;
+ }
+ }
+ if (failure != null) {
+ throw failure;
+ }
+
+ // notify global sync with the write lock held. We do this because we don't want the repository to get updated
+ // while the listener is performing its necessary tasks
+ if (syncListener != null) {
+ syncListener.onGlobalSync();
+ }
+
+ final SerDe<T> serde = serdeFactory.createSerDe(null);
+
// perform checkpoint, writing to .partial file
fileOut = new FileOutputStream(partialPath.toFile());
dataOut = new DataOutputStream(fileOut);
@@ -540,6 +576,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
dataOut.writeInt(serde.getVersion());
dataOut.writeLong(maxTransactionId);
dataOut.writeInt(records.size());
+ serde.writeHeader(dataOut);
for (final T record : records) {
logger.trace("Checkpointing {}", record);
@@ -627,7 +664,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
public static final String JOURNAL_EXTENSION = ".journal";
private static final Pattern JOURNAL_FILENAME_PATTERN = Pattern.compile("\\d+\\.journal");
- private final SerDe<S> serde;
+ private final SerDeFactory<S> serdeFactory;
+ private SerDe<S> serde;
private final Path editDirectory;
private final int writeAheadLogVersion;
@@ -650,9 +688,9 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
private final Queue<Path> recoveryFiles;
- public Partition(final Path path, final SerDe<S> serde, final int partitionIndex, final int writeAheadLogVersion) throws IOException {
+ public Partition(final Path path, final SerDeFactory<S> serdeFactory, final int partitionIndex, final int writeAheadLogVersion) throws IOException {
this.editDirectory = path;
- this.serde = serde;
+ this.serdeFactory = serdeFactory;
final File file = path.toFile();
if (!file.exists() && !file.mkdirs()) {
@@ -744,24 +782,16 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
*
* @throws IOException if failure to rollover
*/
- public void rollover() throws IOException {
+ public OutputStream rollover() throws IOException {
lock.lock();
try {
// Note that here we are closing fileOut and NOT dataOut. See the note in the close()
// method to understand the logic behind this.
- final OutputStream out = fileOut;
- if (out != null) {
- try {
- out.close();
- } catch (final IOException ioe) {
- dataOut = null;
- fileOut = null;
-
- blackList();
- throw ioe;
- }
- }
+ final OutputStream oldOutputStream = fileOut;
+ dataOut = null;
+ fileOut = null;
+ this.serde = serdeFactory.createSerDe(null);
final Path editPath = getNewEditPath();
final FileOutputStream fos = new FileOutputStream(editPath.toFile());
try {
@@ -770,10 +800,18 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
outStream.writeInt(writeAheadLogVersion);
outStream.writeUTF(serde.getClass().getName());
outStream.writeInt(serde.getVersion());
+ serde.writeHeader(outStream);
+
outStream.flush();
dataOut = outStream;
fileOut = fos;
} catch (final IOException ioe) {
+ try {
+ oldOutputStream.close();
+ } catch (final IOException ioe2) {
+ ioe.addSuppressed(ioe2);
+ }
+
logger.error("Failed to create new journal for {} due to {}", new Object[] {this, ioe.toString()}, ioe);
try {
fos.close();
@@ -790,6 +828,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
currentJournalFilename = editPath.toFile().getName();
blackListed = false;
+ return oldOutputStream;
} finally {
lock.unlock();
}
@@ -959,9 +998,11 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
+ "WALI version " + waliVersion + ", but the version used to restore it is only " + writeAheadLogVersion);
}
- @SuppressWarnings("unused")
- final String serdeClassName = recoveryIn.readUTF();
+ final String serdeEncoding = recoveryIn.readUTF();
this.recoveryVersion = recoveryIn.readInt();
+ serde = serdeFactory.createSerDe(serdeEncoding);
+
+ serde.readHeader(recoveryIn);
break;
}
@@ -1009,12 +1050,15 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
final Path newEditPath = getNewEditPath();
+ this.serde = serdeFactory.createSerDe(null);
final FileOutputStream fos = new FileOutputStream(newEditPath.toFile());
final DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(fos));
outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName());
outStream.writeInt(writeAheadLogVersion);
outStream.writeUTF(serde.getClass().getName());
outStream.writeInt(serde.getVersion());
+ serde.writeHeader(outStream);
+
outStream.flush();
dataOut = outStream;
fileOut = fos;
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java
index cc984a6..d1919e7 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java
@@ -30,6 +30,15 @@ import java.util.Map;
public interface SerDe<T> {
/**
+ * Provides the SerDe a chance to write header information to the given output stream
+ *
+ * @param out the DataOutputStream to write to
+ * @throws IOException if unable to write to the OutputStream
+ */
+ default void writeHeader(DataOutputStream out) throws IOException {
+ }
+
+ /**
* <p>
* Serializes an Edit Record to the log via the given
* {@link DataOutputStream}.
@@ -55,6 +64,15 @@ public interface SerDe<T> {
void serializeRecord(T record, DataOutputStream out) throws IOException;
/**
+ * Provides the SerDe the opportunity to read header information before deserializing any records
+ *
+ * @param in the InputStream to read from
+ * @throws IOException if unable to read from the InputStream
+ */
+ default void readHeader(DataInputStream in) throws IOException {
+ }
+
+ /**
* <p>
* Reads an Edit Record from the given {@link DataInputStream} and merges
* that edit with the current version of the record, returning the new,
@@ -65,9 +83,9 @@ public interface SerDe<T> {
*
* @param in to deserialize from
* @param currentRecordStates an unmodifiable map of Record ID's to the
- * current state of that record
+ * current state of that record
* @param version the version of the SerDe that was used to serialize the
- * edit record
+ * edit record
* @return deserialized record
* @throws IOException if failure reading
*/
@@ -125,4 +143,12 @@ public interface SerDe<T> {
* @return version
*/
int getVersion();
+
+ /**
+ * Closes any resources that the SerDe is holding open
+ *
+ * @throws IOException if unable to close resources
+ */
+ default void close() throws IOException {
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDeFactory.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDeFactory.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDeFactory.java
new file mode 100644
index 0000000..09e6f7b
--- /dev/null
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDeFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.wali;
+
+public interface SerDeFactory<T> {
+
+ /**
+ * Returns a new SerDe
+ *
+ * @param encodingName the name of encoding that was used when writing the serialized data, or <code>null</code> if
+ * the SerDe is to be used for serialization purposes
+ * @return a SerDe
+ */
+ SerDe<T> createSerDe(String encodingName);
+
+ /**
+ * Returns the unique ID for the given record
+ *
+ * @param record to obtain identifier for
+ * @return identifier of record
+ */
+ Object getRecordIdentifier(T record);
+
+ /**
+ * Returns the UpdateType for the given record
+ *
+ * @param record to retrieve update type for
+ * @return update type
+ */
+ UpdateType getUpdateType(T record);
+
+ /**
+ * Returns the external location of the given record; this is used when a
+ * record is moved away from WALI or is being re-introduced to WALI. For
+ * example, WALI can be updated with a record of type
+ * {@link UpdateType#SWAP_OUT} that indicates a Location of
+ * file://tmp/external1 and can then be re-introduced to WALI by updating
+ * WALI with a record of type {@link UpdateType#CREATE} that indicates a
+ * Location of file://tmp/external1
+ *
+ * @param record to get location of
+ * @return location
+ */
+ String getLocation(T record);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SingletonSerDeFactory.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SingletonSerDeFactory.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SingletonSerDeFactory.java
new file mode 100644
index 0000000..403f082
--- /dev/null
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SingletonSerDeFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.wali;
+
+public class SingletonSerDeFactory<T> implements SerDeFactory<T> {
+ private final SerDe<T> serde;
+
+ public SingletonSerDeFactory(final SerDe<T> serde) {
+ this.serde = serde;
+ }
+
+ @Override
+ public SerDe<T> createSerDe(final String encodingName) {
+ return serde;
+ }
+
+ @Override
+ public Object getRecordIdentifier(final T record) {
+ return serde.getRecordIdentifier(record);
+ }
+
+ @Override
+ public UpdateType getUpdateType(final T record) {
+ return serde.getUpdateType(record);
+ }
+
+ @Override
+ public String getLocation(final T record) {
+ return serde.getLocation(record);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-commons/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/pom.xml b/nifi-commons/pom.xml
index 558110a..27810cd 100644
--- a/nifi-commons/pom.xml
+++ b/nifi-commons/pom.xml
@@ -38,5 +38,6 @@
<module>nifi-site-to-site-client</module>
<module>nifi-hl7-query-language</module>
<module>nifi-hadoop-utils</module>
+ <module>nifi-schema-utils</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
index bb78896..7e4495b 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
@@ -64,4 +64,28 @@ public interface ResourceClaim extends Comparable<ResourceClaim> {
* @return <code>true</code> if the Resource Claim is in use, <code>false</code> otherwise
*/
boolean isInUse();
+
+
+ /**
+ * Provides the natural ordering for ResourceClaim objects. By default they are sorted by their id, then container, then section
+ *
+ * @param other other claim
+ * @return x such that x <= -1 if this is less than other;
+ * x=0 if this.equals(other);
+ * x >= 1 if this is greater than other
+ */
+ @Override
+ default int compareTo(final ResourceClaim other) {
+ final int idComparison = getId().compareTo(other.getId());
+ if (idComparison != 0) {
+ return idComparison;
+ }
+
+ final int containerComparison = getContainer().compareTo(other.getContainer());
+ if (containerComparison != 0) {
+ return containerComparison;
+ }
+
+ return getSection().compareTo(other.getSection());
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index fdb5a83..7ddae36 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -49,6 +49,10 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-schema-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-utils</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 350cceb..208bbce 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -17,6 +17,7 @@
package org.apache.nifi.controller;
import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
@@ -28,34 +29,31 @@ import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.regex.Pattern;
+
import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
-import org.apache.nifi.controller.repository.IncompleteSwapFileException;
-import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.SwapContents;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
import org.apache.nifi.controller.repository.SwapSummary;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.repository.claim.StandardContentClaim;
-import org.apache.nifi.controller.swap.StandardSwapContents;
-import org.apache.nifi.controller.swap.StandardSwapSummary;
+import org.apache.nifi.controller.swap.SchemaSwapDeserializer;
+import org.apache.nifi.controller.swap.SchemaSwapSerializer;
+import org.apache.nifi.controller.swap.SimpleSwapDeserializer;
+import org.apache.nifi.controller.swap.SwapDeserializer;
+import org.apache.nifi.controller.swap.SwapSerializer;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,6 +81,8 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
private EventReporter eventReporter;
private ResourceClaimManager claimManager;
+ private static final byte[] MAGIC_HEADER = {'S', 'W', 'A', 'P'};
+
/**
* Default no args constructor for service loading only.
*/
@@ -116,8 +116,14 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
final File swapTempFile = new File(swapFile.getParentFile(), swapFile.getName() + ".part");
final String swapLocation = swapFile.getAbsolutePath();
- try (final FileOutputStream fos = new FileOutputStream(swapTempFile)) {
- serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
+ final SwapSerializer serializer = new SchemaSwapSerializer();
+ try (final FileOutputStream fos = new FileOutputStream(swapTempFile);
+ final OutputStream out = new BufferedOutputStream(fos)) {
+ out.write(MAGIC_HEADER);
+ final DataOutputStream dos = new DataOutputStream(out);
+ dos.writeUTF(serializer.getSerializationName());
+
+ serializer.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, out);
fos.getFD().sync();
} catch (final IOException ioe) {
// we failed to write out the entire swap file. Delete the temporary file, if we can.
@@ -154,14 +160,13 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
throw new FileNotFoundException("Failed to swap in FlowFiles from external storage location " + swapLocation + " into FlowFile Queue because the file could not be found");
}
- final SwapContents swapContents;
try (final InputStream fis = new FileInputStream(swapFile);
final InputStream bis = new BufferedInputStream(fis);
final DataInputStream in = new DataInputStream(bis)) {
- swapContents = deserializeFlowFiles(in, swapLocation, flowFileQueue, claimManager);
- }
- return swapContents;
+ final SwapDeserializer deserializer = createSwapDeserializer(in);
+ return deserializer.deserializeFlowFiles(in, swapLocation, flowFileQueue, claimManager);
+ }
}
@Override
@@ -210,11 +215,13 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
// "<timestamp>-<queue identifier>-<random uuid>.swap". If we have two dashes, then we can just check if the queue ID is equal
// to the id of the queue given and if not we can just move on.
final String[] splits = swapFile.getName().split("-");
- if (splits.length == 3) {
- final String queueIdentifier = splits[1];
- if (!queueIdentifier.equals(flowFileQueue.getIdentifier())) {
- continue;
+ if (splits.length > 6) {
+ final String queueIdentifier = splits[1] + "-" + splits[2] + "-" + splits[3] + "-" + splits[4] + "-" + splits[5];
+ if (queueIdentifier.equals(flowFileQueue.getIdentifier())) {
+ swapLocations.add(swapFile.getAbsolutePath());
}
+
+ continue;
}
// Read the queue identifier from the swap file to check if the swap file is for this queue
@@ -222,18 +229,22 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
final InputStream bufferedIn = new BufferedInputStream(fis);
final DataInputStream in = new DataInputStream(bufferedIn)) {
- final int swapEncodingVersion = in.readInt();
- if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
- final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
- + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)";
-
+ final SwapDeserializer deserializer;
+ try {
+ deserializer = createSwapDeserializer(in);
+ } catch (final Exception e) {
+ final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " due to " + e;
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
throw new IOException(errMsg);
}
- final String connectionId = in.readUTF();
- if (connectionId.equals(flowFileQueue.getIdentifier())) {
- swapLocations.add(swapFile.getAbsolutePath());
+ // If deserializer is not an instance of Simple Swap Deserializer, then it means that the serializer is new enough that
+ // we use the 3-element filename as illustrated above, so this is only necessary for the SimpleSwapDeserializer.
+ if (deserializer instanceof SimpleSwapDeserializer) {
+ final String connectionId = in.readUTF();
+ if (connectionId.equals(flowFileQueue.getIdentifier())) {
+ swapLocations.add(swapFile.getAbsolutePath());
+ }
}
}
}
@@ -251,353 +262,36 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
final InputStream bufferedIn = new BufferedInputStream(fis);
final DataInputStream in = new DataInputStream(bufferedIn)) {
- final int swapEncodingVersion = in.readInt();
- if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
- final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
- + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)";
-
- eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
- throw new IOException(errMsg);
- }
-
- final int numRecords;
- final long contentSize;
- Long maxRecordId = null;
- try {
- in.readUTF(); // ignore Connection ID
- numRecords = in.readInt();
- contentSize = in.readLong();
-
- if (numRecords == 0) {
- return StandardSwapSummary.EMPTY_SUMMARY;
- }
-
- if (swapEncodingVersion > 7) {
- maxRecordId = in.readLong();
- }
- } catch (final EOFException eof) {
- logger.warn("Found premature End-of-File when reading Swap File {}. EOF occurred before any FlowFiles were encountered", swapLocation);
- return StandardSwapSummary.EMPTY_SUMMARY;
- }
-
- final QueueSize queueSize = new QueueSize(numRecords, contentSize);
- final SwapContents swapContents = deserializeFlowFiles(in, queueSize, maxRecordId, swapEncodingVersion, true, claimManager, swapLocation);
- return swapContents.getSummary();
- }
- }
-
- public static int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException {
- if (toSwap == null || toSwap.isEmpty()) {
- return 0;
- }
-
- long contentSize = 0L;
- for (final FlowFileRecord record : toSwap) {
- contentSize += record.getSize();
- }
-
- // persist record to disk via the swap file
- final OutputStream bufferedOut = new BufferedOutputStream(destination);
- final DataOutputStream out = new DataOutputStream(bufferedOut);
- try {
- out.writeInt(SWAP_ENCODING_VERSION);
- out.writeUTF(queue.getIdentifier());
- out.writeInt(toSwap.size());
- out.writeLong(contentSize);
-
- // get the max record id and write that out so that we know it quickly for restoration
- long maxRecordId = 0L;
- for (final FlowFileRecord flowFile : toSwap) {
- if (flowFile.getId() > maxRecordId) {
- maxRecordId = flowFile.getId();
- }
- }
-
- out.writeLong(maxRecordId);
-
- for (final FlowFileRecord flowFile : toSwap) {
- out.writeLong(flowFile.getId());
- out.writeLong(flowFile.getEntryDate());
- out.writeLong(flowFile.getLineageStartDate());
- out.writeLong(flowFile.getLineageStartIndex());
- out.writeLong(flowFile.getLastQueueDate());
- out.writeLong(flowFile.getQueueDateIndex());
- out.writeLong(flowFile.getSize());
-
- final ContentClaim claim = flowFile.getContentClaim();
- if (claim == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- final ResourceClaim resourceClaim = claim.getResourceClaim();
- out.writeUTF(resourceClaim.getId());
- out.writeUTF(resourceClaim.getContainer());
- out.writeUTF(resourceClaim.getSection());
- out.writeLong(claim.getOffset());
- out.writeLong(claim.getLength());
- out.writeLong(flowFile.getContentClaimOffset());
- out.writeBoolean(resourceClaim.isLossTolerant());
- }
-
- final Map<String, String> attributes = flowFile.getAttributes();
- out.writeInt(attributes.size());
- for (final Map.Entry<String, String> entry : attributes.entrySet()) {
- writeString(entry.getKey(), out);
- writeString(entry.getValue(), out);
- }
- }
- } finally {
- out.flush();
+ final SwapDeserializer deserializer = createSwapDeserializer(in);
+ return deserializer.getSwapSummary(in, swapLocation, claimManager);
}
-
- logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", toSwap.size(), queue, swapLocation);
-
- return toSwap.size();
}
- private static void writeString(final String toWrite, final OutputStream out) throws IOException {
- final byte[] bytes = toWrite.getBytes(StandardCharsets.UTF_8);
- final int utflen = bytes.length;
- if (utflen < 65535) {
- out.write(utflen >>> 8);
- out.write(utflen);
- out.write(bytes);
- } else {
- out.write(255);
- out.write(255);
- out.write(utflen >>> 24);
- out.write(utflen >>> 16);
- out.write(utflen >>> 8);
- out.write(utflen);
- out.write(bytes);
- }
- }
-
- static SwapContents deserializeFlowFiles(final DataInputStream in, final String swapLocation, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException {
- final int swapEncodingVersion = in.readInt();
- if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
- throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is "
- + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)");
- }
+ private SwapDeserializer createSwapDeserializer(final DataInputStream dis) throws IOException {
+ dis.mark(MAGIC_HEADER.length);
- final String connectionId = in.readUTF(); // Connection ID
- if (!connectionId.equals(queue.getIdentifier())) {
- throw new IllegalArgumentException("Cannot deserialize FlowFiles from Swap File at location " + swapLocation
- + " because those FlowFiles belong to Connection with ID " + connectionId + " and an attempt was made to swap them into a Connection with ID " + queue.getIdentifier());
- }
-
- int numRecords = 0;
- long contentSize = 0L;
- Long maxRecordId = null;
+ final byte[] magicHeader = new byte[MAGIC_HEADER.length];
try {
- numRecords = in.readInt();
- contentSize = in.readLong(); // Content Size
- if (swapEncodingVersion > 7) {
- maxRecordId = in.readLong(); // Max Record ID
- }
+ StreamUtils.fillBuffer(dis, magicHeader);
} catch (final EOFException eof) {
- final QueueSize queueSize = new QueueSize(numRecords, contentSize);
- final SwapSummary summary = new StandardSwapSummary(queueSize, maxRecordId, Collections.emptyList());
- final SwapContents partialContents = new StandardSwapContents(summary, Collections.emptyList());
- throw new IncompleteSwapFileException(swapLocation, partialContents);
+ throw new IOException("Failed to read swap file because the file contained less than 4 bytes of data");
}
- final QueueSize queueSize = new QueueSize(numRecords, contentSize);
- return deserializeFlowFiles(in, queueSize, maxRecordId, swapEncodingVersion, false, claimManager, swapLocation);
- }
-
- private static SwapContents deserializeFlowFiles(final DataInputStream in, final QueueSize queueSize, final Long maxRecordId,
- final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager, final String location) throws IOException {
- final List<FlowFileRecord> flowFiles = new ArrayList<>(queueSize.getObjectCount());
- final List<ResourceClaim> resourceClaims = new ArrayList<>(queueSize.getObjectCount());
- Long maxId = maxRecordId;
-
- for (int i = 0; i < queueSize.getObjectCount(); i++) {
- try {
- // legacy encoding had an "action" because it used to be couple with FlowFile Repository code
- if (serializationVersion < 3) {
- final int action = in.read();
- if (action != 1) {
- throw new IOException("Swap File is version " + serializationVersion + " but did not contain a 'UPDATE' record type");
- }
- }
-
- final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
- final long recordId = in.readLong();
- if (maxId == null || recordId > maxId) {
- maxId = recordId;
- }
-
- ffBuilder.id(recordId);
- ffBuilder.entryDate(in.readLong());
-
- if (serializationVersion > 1) {
- // Lineage information was added in version 2
- if (serializationVersion < 10) {
- final int numLineageIdentifiers = in.readInt();
- for (int lineageIdIdx = 0; lineageIdIdx < numLineageIdentifiers; lineageIdIdx++) {
- in.readUTF(); //skip each identifier
- }
- }
-
- // version 9 adds in a 'lineage start index'
- final long lineageStartDate = in.readLong();
- final long lineageStartIndex;
- if (serializationVersion > 8) {
- lineageStartIndex = in.readLong();
- } else {
- lineageStartIndex = 0L;
- }
-
- ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
-
- if (serializationVersion > 5) {
- // Version 9 adds in a 'queue date index'
- final long lastQueueDate = in.readLong();
- final long queueDateIndex;
- if (serializationVersion > 8) {
- queueDateIndex = in.readLong();
- } else {
- queueDateIndex = 0L;
- }
-
- ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
- }
- }
-
- ffBuilder.size(in.readLong());
-
- if (serializationVersion < 3) {
- readString(in); // connection Id
- }
-
- final boolean hasClaim = in.readBoolean();
- ResourceClaim resourceClaim = null;
- if (hasClaim) {
- final String claimId;
- if (serializationVersion < 5) {
- claimId = String.valueOf(in.readLong());
- } else {
- claimId = in.readUTF();
- }
-
- final String container = in.readUTF();
- final String section = in.readUTF();
-
- final long resourceOffset;
- final long resourceLength;
- if (serializationVersion < 6) {
- resourceOffset = 0L;
- resourceLength = -1L;
- } else {
- resourceOffset = in.readLong();
- resourceLength = in.readLong();
- }
-
- final long claimOffset = in.readLong();
-
- final boolean lossTolerant;
- if (serializationVersion >= 4) {
- lossTolerant = in.readBoolean();
- } else {
- lossTolerant = false;
- }
-
- resourceClaim = claimManager.getResourceClaim(container, section, claimId);
- if (resourceClaim == null) {
- logger.error("Swap file indicates that FlowFile was referencing Resource Claim at container={}, section={}, claimId={}, "
- + "but this Resource Claim cannot be found! Will create a temporary Resource Claim, but this may affect the framework's "
- + "ability to properly clean up this resource", container, section, claimId);
- resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, true);
- }
-
- final StandardContentClaim claim = new StandardContentClaim(resourceClaim, resourceOffset);
- claim.setLength(resourceLength);
-
- if (incrementContentClaims) {
- claimManager.incrementClaimantCount(resourceClaim);
- }
-
- ffBuilder.contentClaim(claim);
- ffBuilder.contentClaimOffset(claimOffset);
- }
-
- boolean attributesChanged = true;
- if (serializationVersion < 3) {
- attributesChanged = in.readBoolean();
- }
-
- if (attributesChanged) {
- final int numAttributes = in.readInt();
- for (int j = 0; j < numAttributes; j++) {
- final String key = readString(in);
- final String value = readString(in);
-
- ffBuilder.addAttribute(key, value);
- }
- }
-
- final FlowFileRecord record = ffBuilder.build();
- if (resourceClaim != null) {
- resourceClaims.add(resourceClaim);
- }
-
- flowFiles.add(record);
- } catch (final EOFException eof) {
- final SwapSummary swapSummary = new StandardSwapSummary(queueSize, maxId, resourceClaims);
- final SwapContents partialContents = new StandardSwapContents(swapSummary, flowFiles);
- throw new IncompleteSwapFileException(location, partialContents);
+ if (Arrays.equals(magicHeader, MAGIC_HEADER)) {
+ final String serializationName = dis.readUTF();
+ if (serializationName.equals(SchemaSwapDeserializer.getSerializationName())) {
+ return new SchemaSwapDeserializer();
}
- }
-
- final SwapSummary swapSummary = new StandardSwapSummary(queueSize, maxId, resourceClaims);
- return new StandardSwapContents(swapSummary, flowFiles);
- }
- private static String readString(final InputStream in) throws IOException {
- final Integer numBytes = readFieldLength(in);
- if (numBytes == null) {
- throw new EOFException();
- }
- final byte[] bytes = new byte[numBytes];
- fillBuffer(in, bytes, numBytes);
- return new String(bytes, StandardCharsets.UTF_8);
- }
-
- private static Integer readFieldLength(final InputStream in) throws IOException {
- final int firstValue = in.read();
- final int secondValue = in.read();
- if (firstValue < 0) {
- return null;
- }
- if (secondValue < 0) {
- throw new EOFException();
- }
- if (firstValue == 0xff && secondValue == 0xff) {
- final int ch1 = in.read();
- final int ch2 = in.read();
- final int ch3 = in.read();
- final int ch4 = in.read();
- if ((ch1 | ch2 | ch3 | ch4) < 0) {
- throw new EOFException();
- }
- return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
+ throw new IOException("Cannot find a suitable Deserializer for swap file, written with Serialization Name '" + serializationName + "'");
} else {
- return (firstValue << 8) + secondValue;
+ // SimpleSwapDeserializer is old and did not write out a magic header.
+ dis.reset();
+ return new SimpleSwapDeserializer();
}
}
- private static void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException {
- int bytesRead;
- int totalBytesRead = 0;
- while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) {
- totalBytesRead += bytesRead;
- }
- if (totalBytesRead != length) {
- throw new EOFException();
- }
- }
private void error(final String error) {
logger.error(error);
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 68af208..e20e250 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -820,6 +820,7 @@ public class StandardFlowFileQueue implements FlowFileQueue {
long swapByteCount = 0L;
Long maxId = null;
List<ResourceClaim> resourceClaims = new ArrayList<>();
+ final long startNanos = System.nanoTime();
writeLock.lock();
try {
@@ -866,6 +867,11 @@ public class StandardFlowFileQueue implements FlowFileQueue {
writeLock.unlock("Recover Swap Files");
}
+ if (!swapLocations.isEmpty()) {
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ logger.info("Recovered {} swap files for {} in {} millis", swapLocations.size(), this, millis);
+ }
+
return new StandardSwapSummary(new QueueSize(swapFlowFileCount, swapByteCount), maxId, resourceClaims);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java
new file mode 100644
index 0000000..44ed62d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.util.Map;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.wali.SerDe;
+import org.wali.UpdateType;
+
+public abstract class RepositoryRecordSerde implements SerDe<RepositoryRecord> {
+ private Map<String, FlowFileQueue> flowFileQueueMap = null;
+
+ protected void setQueueMap(final Map<String, FlowFileQueue> queueMap) {
+ this.flowFileQueueMap = queueMap;
+ }
+
+ protected Map<String, FlowFileQueue> getQueueMap() {
+ return flowFileQueueMap;
+ }
+
+ protected FlowFileQueue getFlowFileQueue(final String queueId) {
+ return flowFileQueueMap.get(queueId);
+ }
+
+ @Override
+ public Long getRecordIdentifier(final RepositoryRecord record) {
+ return record.getCurrent().getId();
+ }
+
+ @Override
+ public UpdateType getUpdateType(final RepositoryRecord record) {
+ switch (record.getType()) {
+ case CONTENTMISSING:
+ case DELETE:
+ return UpdateType.DELETE;
+ case CREATE:
+ return UpdateType.CREATE;
+ case UPDATE:
+ return UpdateType.UPDATE;
+ case SWAP_OUT:
+ return UpdateType.SWAP_OUT;
+ case SWAP_IN:
+ return UpdateType.SWAP_IN;
+ }
+ return null;
+ }
+
+ @Override
+ public String getLocation(final RepositoryRecord record) {
+ return record.getSwapLocation();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java
new file mode 100644
index 0000000..c19fa94
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.util.Map;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.wali.SerDe;
+import org.wali.SerDeFactory;
+import org.wali.UpdateType;
+
+public class RepositoryRecordSerdeFactory implements SerDeFactory<RepositoryRecord> {
+ private final String LEGACY_SERDE_ENCODING_NAME = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository$WriteAheadRecordSerde";
+ private final ResourceClaimManager resourceClaimManager;
+ private Map<String, FlowFileQueue> flowFileQueueMap = null;
+
+ public RepositoryRecordSerdeFactory(final ResourceClaimManager claimManager) {
+ this.resourceClaimManager = claimManager;
+ }
+
+ protected void setQueueMap(final Map<String, FlowFileQueue> queueMap) {
+ this.flowFileQueueMap = queueMap;
+ }
+
+ protected Map<String, FlowFileQueue> getQueueMap() {
+ return flowFileQueueMap;
+ }
+
+ @Override
+ public SerDe<RepositoryRecord> createSerDe(final String encodingName) {
+ if (encodingName == null || SchemaRepositoryRecordSerde.class.getName().equals(encodingName)) {
+ final SchemaRepositoryRecordSerde serde = new SchemaRepositoryRecordSerde(resourceClaimManager);
+ serde.setQueueMap(flowFileQueueMap);
+ return serde;
+ }
+
+ if (WriteAheadRepositoryRecordSerde.class.getName().equals(encodingName)
+ || LEGACY_SERDE_ENCODING_NAME.equals(encodingName)) {
+ final WriteAheadRepositoryRecordSerde serde = new WriteAheadRepositoryRecordSerde(resourceClaimManager);
+ serde.setQueueMap(flowFileQueueMap);
+ return serde;
+ }
+
+ throw new IllegalArgumentException("Cannot create Deserializer for Repository Records because the encoding '" + encodingName + "' is not known");
+ }
+
+ protected FlowFileQueue getFlowFileQueue(final String queueId) {
+ return flowFileQueueMap.get(queueId);
+ }
+
+ @Override
+ public Long getRecordIdentifier(final RepositoryRecord record) {
+ return record.getCurrent().getId();
+ }
+
+ @Override
+ public UpdateType getUpdateType(final RepositoryRecord record) {
+ switch (record.getType()) {
+ case CONTENTMISSING:
+ case DELETE:
+ return UpdateType.DELETE;
+ case CREATE:
+ return UpdateType.CREATE;
+ case UPDATE:
+ return UpdateType.UPDATE;
+ case SWAP_OUT:
+ return UpdateType.SWAP_OUT;
+ case SWAP_IN:
+ return UpdateType.SWAP_IN;
+ }
+ return null;
+ }
+
+ @Override
+ public String getLocation(final RepositoryRecord record) {
+ return record.getSwapLocation();
+ }
+
+}