You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/19 19:16:09 UTC

[35/59] [abbrv] [partial] incubator-nifi git commit: Reworked overall directory structure to make releasing nifi vs maven plugis easier

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/wali/pom.xml
----------------------------------------------------------------------
diff --git a/commons/wali/pom.xml b/commons/wali/pom.xml
deleted file mode 100644
index 347c8cc..0000000
--- a/commons/wali/pom.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<?xml version="1.0"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-      http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.nifi</groupId>
-        <artifactId>nifi-commons-parent</artifactId>
-        <version>0.0.1-SNAPSHOT</version>
-    </parent>
-
-    <artifactId>wali</artifactId>
-    <version>0.0.1-SNAPSHOT</version>
-    <packaging>jar</packaging>
-    
-    <name>WALI : Write-Ahead Log Implementation</name>
-    
-    <dependencies>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-utils</artifactId>
-        </dependency>
-    </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/wali/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/commons/wali/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
deleted file mode 100644
index 19208d3..0000000
--- a/commons/wali/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
+++ /dev/null
@@ -1,1008 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wali;
-
-import static java.util.Objects.requireNonNull;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.channels.FileChannel;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <p>
- * This implementation provides as little Locking as possible in order to
- * provide the highest throughput possible. However, this implementation is ONLY
- * appropriate if it can be guaranteed that only a single thread will ever issue
- * updates for a given Record at any one time.
- * </p>
- *
- * @param <T>
- */
-public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepository<T> {
-
-    private final Path basePath;
-    private final Path partialPath;
-    private final Path snapshotPath;
-
-    private final SerDe<T> serde;
-    private final SyncListener syncListener;
-    private final FileChannel lockChannel;
-    private final AtomicLong transactionIdGenerator = new AtomicLong(0L);
-
-    private final Partition<T>[] partitions;
-    private final AtomicLong partitionIndex = new AtomicLong(0L);
-    private final ConcurrentMap<Object, T> recordMap = new ConcurrentHashMap<>();
-    private final Map<Object, T> unmodifiableRecordMap = Collections.unmodifiableMap(recordMap);
-    private final Set<String> externalLocations = new CopyOnWriteArraySet<>();
-
-    private final Set<String> recoveredExternalLocations = new CopyOnWriteArraySet<>();
-
-    private final AtomicInteger numberBlackListedPartitions = new AtomicInteger(0);
-
-    private static final Logger logger = LoggerFactory.getLogger(MinimalLockingWriteAheadLog.class);
-
-    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
-    private final Lock readLock = rwLock.readLock(); // required to update a partition
-    private final Lock writeLock = rwLock.writeLock(); // required for checkpoint
-
-    private volatile boolean updated = false;
-    private volatile boolean recovered = false;
-
-    public MinimalLockingWriteAheadLog(final Path path, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException {
-        this(new TreeSet<>(Collections.singleton(path)), partitionCount, serde, syncListener);
-    }
-
-    /**
-     *
-     * @param paths a sorted set of Paths to use for the partitions/journals and
-     * the snapshot. The snapshot will always be written to the first path
-     * specified.
-     *
-     * @param partitionCount the number of partitions/journals to use. For best
-     * performance, this should be close to the number of threads that are
-     * expected to update the repository simultaneously
-     *
-     * @param serde
-     * @param syncListener
-     * @throws IOException
-     */
-    @SuppressWarnings("unchecked")
-    public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException {
-        this.syncListener = syncListener;
-
-        requireNonNull(paths);
-        requireNonNull(serde);
-
-        if (paths.isEmpty()) {
-            throw new IllegalArgumentException("Paths must be non-empty");
-        }
-
-        int existingPartitions = 0;
-        for (final Path path : paths) {
-            if (!Files.exists(path)) {
-                Files.createDirectories(path);
-            }
-
-            final File file = path.toFile();
-            if (!file.isDirectory()) {
-                throw new IOException("Path given [" + path + "] is not a directory");
-            }
-            if (!file.canWrite()) {
-                throw new IOException("Path given [" + path + "] is not writable");
-            }
-            if (!file.canRead()) {
-                throw new IOException("Path given [" + path + "] is not readable");
-            }
-            if (!file.canExecute()) {
-                throw new IOException("Path given [" + path + "] is not executable");
-            }
-
-            final File[] children = file.listFiles();
-            if (children != null) {
-                for (final File child : children) {
-                    if (child.isDirectory() && child.getName().startsWith("partition-")) {
-                        existingPartitions++;
-                    }
-                }
-
-                if (existingPartitions != 0 && existingPartitions != partitionCount) {
-                    logger.warn("Constructing MinimalLockingWriteAheadLog with partitionCount={}, but the repository currently has "
-                            + "{} partitions; ignoring argument and proceeding with {} partitions",
-                            new Object[]{partitionCount, existingPartitions, existingPartitions});
-                }
-            }
-        }
-
-        this.basePath = paths.iterator().next();
-        this.partialPath = basePath.resolve("snapshot.partial");
-        this.snapshotPath = basePath.resolve("snapshot");
-        this.serde = serde;
-
-        final Path lockPath = basePath.resolve("wali.lock");
-        lockChannel = new FileOutputStream(lockPath.toFile()).getChannel();
-        lockChannel.lock();
-
-        partitions = new Partition[partitionCount];
-
-        Iterator<Path> pathIterator = paths.iterator();
-        for (int i = 0; i < partitionCount; i++) {
-            // If we're out of paths, create a new iterator to start over.
-            if (!pathIterator.hasNext()) {
-                pathIterator = paths.iterator();
-            }
-
-            final Path partitionBasePath = pathIterator.next();
-
-            partitions[i] = new Partition<>(partitionBasePath.resolve("partition-" + i), serde, i, getVersion());
-        }
-    }
-
-    @Override
-    public int update(final Collection<T> records, final boolean forceSync) throws IOException {
-        if (!recovered) {
-            throw new IllegalStateException("Cannot update repository until record recovery has been performed");
-        }
-
-        if (records.isEmpty()) {
-            return -1;
-        }
-
-        updated = true;
-        readLock.lock();
-        try {
-            while (true) {
-                final int numBlackListed = numberBlackListedPartitions.get();
-                if (numBlackListed >= partitions.length) {
-                    throw new IOException("All Partitions have been blacklisted due to failures when attempting to update. If the Write-Ahead Log is able to perform a checkpoint, this issue may resolve itself. Otherwise, manual intervention will be required.");
-                }
-
-                final long partitionIdx = partitionIndex.getAndIncrement();
-                final int resolvedIdx = (int) (partitionIdx % partitions.length);
-                final Partition<T> partition = partitions[resolvedIdx];
-                if (partition.tryClaim()) {
-                    try {
-                        final long transactionId = transactionIdGenerator.getAndIncrement();
-                        if (logger.isTraceEnabled()) {
-                            for (final T record : records) {
-                                logger.trace("Partition {} performing Transaction {}: {}", new Object[]{partition, transactionId, record});
-                            }
-                        }
-
-                        try {
-                            partition.update(records, transactionId, unmodifiableRecordMap, forceSync);
-                        } catch (final Exception e) {
-                            partition.blackList();
-                            numberBlackListedPartitions.incrementAndGet();
-                            throw e;
-                        }
-
-                        if (forceSync && syncListener != null) {
-                            syncListener.onSync(resolvedIdx);
-                        }
-                    } finally {
-                        partition.releaseClaim();
-                    }
-
-                    for (final T record : records) {
-                        final UpdateType updateType = serde.getUpdateType(record);
-                        final Object recordIdentifier = serde.getRecordIdentifier(record);
-
-                        if (updateType == UpdateType.DELETE) {
-                            recordMap.remove(recordIdentifier);
-                        } else if (updateType == UpdateType.SWAP_OUT) {
-                            final String newLocation = serde.getLocation(record);
-                            if (newLocation == null) {
-                                logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_OUT but no indicator of where the Record is to be Swapped Out to; these records may be lost when the repository is restored!");
-                            } else {
-                                recordMap.remove(recordIdentifier);
-                                this.externalLocations.add(newLocation);
-                            }
-                        } else if (updateType == UpdateType.SWAP_IN) {
-                            final String newLocation = serde.getLocation(record);
-                            if (newLocation == null) {
-                                logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_IN but no indicator of where the Record is to be Swapped In from; these records may be duplicated when the repository is restored!");
-                            } else {
-                                externalLocations.remove(newLocation);
-                            }
-                            recordMap.put(recordIdentifier, record);
-                        } else {
-                            recordMap.put(recordIdentifier, record);
-                        }
-                    }
-
-                    return resolvedIdx;
-                }
-            }
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    @Override
-    public Collection<T> recoverRecords() throws IOException {
-        if (updated) {
-            throw new IllegalStateException("Cannot recover records after updating the repository; must call recoverRecords first");
-        }
-
-        final long recoverStart = System.nanoTime();
-        writeLock.lock();
-        try {
-            Long maxTransactionId = recoverFromSnapshot(recordMap);
-            recoverFromEdits(recordMap, maxTransactionId);
-
-            for (final Partition<T> partition : partitions) {
-                final long transId = partition.getMaxRecoveredTransactionId();
-                if (maxTransactionId == null || transId > maxTransactionId) {
-                    maxTransactionId = transId;
-                }
-            }
-
-            this.transactionIdGenerator.set(maxTransactionId + 1);
-            this.externalLocations.addAll(recoveredExternalLocations);
-            logger.info("{} finished recovering records. Performing Checkpoint to ensure proper state of Partitions before updates", this);
-        } finally {
-            writeLock.unlock();
-        }
-        final long recoverNanos = System.nanoTime() - recoverStart;
-        final long recoveryMillis = TimeUnit.MILLISECONDS.convert(recoverNanos, TimeUnit.NANOSECONDS);
-        logger.info("Successfully recovered {} records in {} milliseconds", recordMap.size(), recoveryMillis);
-        checkpoint();
-
-        recovered = true;
-        return recordMap.values();
-    }
-
-    @Override
-    public Set<String> getRecoveredSwapLocations() throws IOException {
-        return recoveredExternalLocations;
-    }
-
-    private Long recoverFromSnapshot(final Map<Object, T> recordMap) throws IOException {
-        final boolean partialExists = Files.exists(partialPath);
-        final boolean snapshotExists = Files.exists(snapshotPath);
-
-        if (!partialExists && !snapshotExists) {
-            return null;
-        }
-
-        if (partialExists && snapshotExists) {
-            // both files exist -- assume we failed while checkpointing. Delete
-            // the partial file
-            Files.delete(partialPath);
-        } else if (partialExists) {
-            // partial exists but snapshot does not -- we must have completed
-            // creating the partial, deleted the snapshot
-            // but crashed before renaming the partial to the snapshot. Just
-            // rename partial to snapshot
-            Files.move(partialPath, snapshotPath);
-        }
-
-        if (Files.size(snapshotPath) == 0) {
-            logger.warn("{} Found 0-byte Snapshot file; skipping Snapshot file in recovery", this);
-            return null;
-        }
-
-        // at this point, we know the snapshotPath exists because if it didn't, then we either returned null
-        // or we renamed partialPath to snapshotPath. So just Recover from snapshotPath.
-        try (final DataInputStream dataIn = new DataInputStream(new BufferedInputStream(Files.newInputStream(snapshotPath, StandardOpenOption.READ)))) {
-            final String waliImplementationClass = dataIn.readUTF();
-            final int waliImplementationVersion = dataIn.readInt();
-
-            if (!waliImplementationClass.equals(MinimalLockingWriteAheadLog.class.getName())) {
-                throw new IOException("Write-Ahead Log located at " + snapshotPath + " was written using the " + waliImplementationClass + " class; cannot restore using " + getClass().getName());
-            }
-
-            if (waliImplementationVersion > getVersion()) {
-                throw new IOException("Write-Ahead Log located at " + snapshotPath + " was written using version " + waliImplementationVersion + " of the " + waliImplementationClass + " class; cannot restore using Version " + getVersion());
-            }
-
-            dataIn.readUTF(); // ignore serde class name for now
-            final int serdeVersion = dataIn.readInt();
-            final long maxTransactionId = dataIn.readLong();
-            final int numRecords = dataIn.readInt();
-
-            for (int i = 0; i < numRecords; i++) {
-                final T record = serde.deserializeRecord(dataIn, serdeVersion);
-                if (record == null) {
-                    throw new EOFException();
-                }
-
-                final UpdateType updateType = serde.getUpdateType(record);
-                if (updateType == UpdateType.DELETE) {
-                    logger.warn("While recovering from snapshot, found record with type 'DELETE'; this record will not be restored");
-                    continue;
-                }
-
-                logger.trace("Recovered from snapshot: {}", record);
-                recordMap.put(serde.getRecordIdentifier(record), record);
-            }
-
-            final int numSwapRecords = dataIn.readInt();
-            final Set<String> swapLocations = new HashSet<>();
-            for (int i = 0; i < numSwapRecords; i++) {
-                swapLocations.add(dataIn.readUTF());
-            }
-            this.recoveredExternalLocations.addAll(swapLocations);
-
-            logger.debug("{} restored {} Records and {} Swap Files from Snapshot, ending with Transaction ID {}", new Object[]{this, numRecords, recoveredExternalLocations.size(), maxTransactionId});
-            return maxTransactionId;
-        }
-    }
-
-    /**
-     * Recovers records from the edit logs via the Partitions. Returns a boolean
-     * if recovery of a Partition requires the Write-Ahead Log be checkpointed
-     * before modification.
-     *
-     * @param modifiableRecordMap
-     * @param maxTransactionIdRestored
-     * @return
-     * @throws IOException
-     */
-    private void recoverFromEdits(final Map<Object, T> modifiableRecordMap, final Long maxTransactionIdRestored) throws IOException {
-        final Map<Object, T> updateMap = new HashMap<>();
-        final Map<Object, T> unmodifiableRecordMap = Collections.unmodifiableMap(modifiableRecordMap);
-        final Map<Object, T> ignorableMap = new HashMap<>();
-        final Set<String> ignorableSwapLocations = new HashSet<>();
-
-        // populate a map of the next transaction id for each partition to the
-        // partition that has that next transaction id.
-        final SortedMap<Long, Partition<T>> transactionMap = new TreeMap<>();
-        for (final Partition<T> partition : partitions) {
-            Long transactionId;
-            boolean keepTransaction;
-            do {
-                transactionId = partition.getNextRecoverableTransactionId();
-
-                keepTransaction = transactionId == null || maxTransactionIdRestored == null || transactionId > maxTransactionIdRestored;
-                if (keepTransaction && transactionId != null) {
-                    // map this transaction id to its partition so that we can
-                    // start restoring transactions from this partition,
-                    // starting at 'transactionId'
-                    transactionMap.put(transactionId, partition);
-                } else if (transactionId != null) {
-                    // skip the next transaction, because our snapshot already
-                    // contained this transaction.
-                    try {
-                        partition.recoverNextTransaction(ignorableMap, updateMap, ignorableSwapLocations);
-                    } catch (final EOFException e) {
-                        logger.error("{} unexpectedly reached End of File while reading from {} for Transaction {}; assuming crash and ignoring this transaction.",
-                                new Object[]{this, partition, transactionId});
-                    }
-                }
-            } while (!keepTransaction);
-        }
-
-        while (!transactionMap.isEmpty()) {
-            final Map.Entry<Long, Partition<T>> firstEntry = transactionMap.entrySet().iterator().next();
-            final Long firstTransactionId = firstEntry.getKey();
-            final Partition<T> nextPartition = firstEntry.getValue();
-
-            try {
-                updateMap.clear();
-                final Set<Object> idsRemoved = nextPartition.recoverNextTransaction(unmodifiableRecordMap, updateMap, recoveredExternalLocations);
-                modifiableRecordMap.putAll(updateMap);
-                for (final Object id : idsRemoved) {
-                    modifiableRecordMap.remove(id);
-                }
-            } catch (final EOFException e) {
-                logger.error("{} unexpectedly reached End-of-File when reading from {} for Transaction ID {}; assuming crash and ignoring this transaction",
-                        new Object[]{this, nextPartition, firstTransactionId});
-            }
-
-            transactionMap.remove(firstTransactionId);
-
-            Long subsequentTransactionId = null;
-            try {
-                subsequentTransactionId = nextPartition.getNextRecoverableTransactionId();
-            } catch (final IOException e) {
-                logger.error("{} unexpectedly found End-of-File when reading from {} for Transaction ID {}; assuming crash and ignoring this transaction",
-                        new Object[]{this, nextPartition, firstTransactionId});
-            }
-
-            if (subsequentTransactionId != null) {
-                transactionMap.put(subsequentTransactionId, nextPartition);
-            }
-        }
-
-        for (final Partition<T> partition : partitions) {
-            partition.endRecovery();
-        }
-    }
-
-    @Override
-    public synchronized int checkpoint() throws IOException {
-        final Set<T> records;
-        final Set<String> swapLocations;
-        final long maxTransactionId;
-
-        final long startNanos = System.nanoTime();
-
-        FileOutputStream fileOut = null;
-        DataOutputStream dataOut = null;
-
-        long stopTheWorldNanos = -1L;
-        long stopTheWorldStart = -1L;
-        try {
-            writeLock.lock();
-            try {
-                stopTheWorldStart = System.nanoTime();
-                // stop the world while we make a copy of the records that must
-                // be checkpointed and rollover the partitions.
-                // We copy the records because serializing them is potentially
-                // very expensive, especially when we have hundreds
-                // of thousands or even millions of them. We don't want to
-                // prevent WALI from being used during this time.
-
-                // So the design is to copy all of the records, determine the
-                // last transaction ID that the records represent,
-                // and roll over the partitions to new write-ahead logs.
-                // Then, outside of the write lock, we will serialize the data
-                // to disk, and then remove the old Partition data.
-                records = new HashSet<>(recordMap.values());
-                maxTransactionId = transactionIdGenerator.get() - 1;
-
-                swapLocations = new HashSet<>(externalLocations);
-                for (final Partition<T> partition : partitions) {
-                    partition.rollover();
-                }
-
-                // notify global sync with the write lock held. We do this because we don't want the repository to get updated
-                // while the listener is performing its necessary tasks
-                if (syncListener != null) {
-                    syncListener.onGlobalSync();
-                }
-            } finally {
-                writeLock.unlock();
-            }
-
-            stopTheWorldNanos = System.nanoTime() - stopTheWorldStart;
-
-            // perform checkpoint, writing to .partial file
-            fileOut = new FileOutputStream(partialPath.toFile());
-            dataOut = new DataOutputStream(fileOut);
-            dataOut.writeUTF(MinimalLockingWriteAheadLog.class.getName());
-            dataOut.writeInt(getVersion());
-            dataOut.writeUTF(serde.getClass().getName());
-            dataOut.writeInt(serde.getVersion());
-            dataOut.writeLong(maxTransactionId);
-            dataOut.writeInt(records.size());
-
-            for (final T record : records) {
-                logger.trace("Checkpointing {}", record);
-                serde.serializeRecord(record, dataOut);
-            }
-
-            dataOut.writeInt(swapLocations.size());
-            for (final String swapLocation : swapLocations) {
-                dataOut.writeUTF(swapLocation);
-            }
-        } finally {
-            if (dataOut != null) {
-                try {
-                    dataOut.flush();
-                    fileOut.getFD().sync();
-                    dataOut.close();
-                } catch (final IOException e) {
-                    logger.warn("Failed to close Data Stream due to {}", e.toString(), e);
-                }
-            }
-        }
-
-        // delete the snapshot, if it exists, and rename the .partial to
-        // snapshot
-        Files.deleteIfExists(snapshotPath);
-        Files.move(partialPath, snapshotPath);
-
-        // clear all of the edit logs
-        final long partitionStart = System.nanoTime();
-        for (final Partition<T> partition : partitions) {
-            // we can call clearOld without claiming the partition because it
-            // does not change the partition's state
-            // and the only member variable it touches cannot be modified, other
-            // than when #rollover() is called.
-            // And since this method is the only one that calls #rollover() and
-            // this method is synchronized,
-            // the value of that member variable will not change. And it's
-            // volatile, so we will get the correct value.
-            partition.clearOld();
-        }
-        final long partitionEnd = System.nanoTime();
-        numberBlackListedPartitions.set(0);
-
-        final long endNanos = System.nanoTime();
-        final long millis = TimeUnit.MILLISECONDS.convert(endNanos - startNanos, TimeUnit.NANOSECONDS);
-        final long partitionMillis = TimeUnit.MILLISECONDS.convert(partitionEnd - partitionStart, TimeUnit.NANOSECONDS);
-        final long stopTheWorldMillis = TimeUnit.NANOSECONDS.toMillis(stopTheWorldNanos);
-
-        logger.info("{} checkpointed with {} Records and {} Swap Files in {} milliseconds (Stop-the-world time = {} milliseconds, Clear Edit Logs time = {} millis), max Transaction ID {}",
-                new Object[]{this, records.size(), swapLocations.size(), millis, stopTheWorldMillis, partitionMillis, maxTransactionId});
-
-        return records.size();
-    }
-
-    @Override
-    public void shutdown() throws IOException {
-        writeLock.lock();
-        try {
-            for (final Partition<T> partition : partitions) {
-                partition.close();
-            }
-        } finally {
-            writeLock.unlock();
-            lockChannel.close();
-        }
-    }
-
-    public int getVersion() {
-        return 1;
-    }
-
-    /**
-     * Represents a partition of this repository, which maps directly to a
-     * .journal file.
-     *
-     * All methods with the exceptions of {@link #claim()}, {@link #tryClaim()},
-     * and {@link #releaseClaim()} in this Partition MUST be called while
-     * holding the claim (via {@link #claim} or {@link #tryClaim()).
-     *
-     * @param <S>
-     */
-    private static class Partition<S> {
-
-        public static final String JOURNAL_EXTENSION = ".journal";
-        private static final Pattern JOURNAL_FILENAME_PATTERN = Pattern.compile("\\d+\\.journal");
-
-        private final SerDe<S> serde;
-
-        private final Path editDirectory;
-        private final int writeAheadLogVersion;
-
-        private final Lock lock = new ReentrantLock();
-        private DataOutputStream dataOut = null;
-        private FileOutputStream fileOut = null;
-        private boolean blackListed = false;
-        private boolean closed = false;
-        private DataInputStream recoveryIn;
-        private int recoveryVersion;
-        private String currentJournalFilename = "";
-
-        private static final byte TRANSACTION_CONTINUE = 1;
-        private static final byte TRANSACTION_COMMIT = 2;
-
-        private final String description;
-        private final AtomicLong maxTransactionId = new AtomicLong(-1L);
-        private final Logger logger = LoggerFactory.getLogger(MinimalLockingWriteAheadLog.class);
-
-        private final Queue<Path> recoveryFiles;
-
-        public Partition(final Path path, final SerDe<S> serde, final int partitionIndex, final int writeAheadLogVersion) throws IOException {
-            this.editDirectory = path;
-            this.serde = serde;
-
-            final File file = path.toFile();
-            if (!file.exists() && !file.mkdirs()) {
-                throw new IOException("Could not create directory " + file.getAbsolutePath());
-            }
-
-            this.recoveryFiles = new LinkedBlockingQueue<>();
-            for (final Path recoveryPath : getRecoveryPaths()) {
-                recoveryFiles.add(recoveryPath);
-            }
-
-            this.description = "Partition-" + partitionIndex;
-            this.writeAheadLogVersion = writeAheadLogVersion;
-        }
-
-        public boolean tryClaim() {
-            final boolean obtainedLock = lock.tryLock();
-            if (!obtainedLock) {
-                return false;
-            }
-
-            // Check if the partition is blacklisted. If so, unlock it and return false. Otherwise,
-            // leave it locked and return true, so that the caller will need to unlock.
-            if (blackListed) {
-                lock.unlock();
-                return false;
-            }
-
-            return true;
-        }
-
-        public void releaseClaim() {
-            lock.unlock();
-        }
-
-        public void close() {
-            final DataOutputStream out = dataOut;
-            if (out != null) {
-                try {
-                    out.close();
-                } catch (final Exception e) {
-
-                }
-            }
-
-            this.closed = true;
-            this.dataOut = null;
-        }
-
-        public void blackList() {
-            lock.lock();
-            try {
-                blackListed = true;
-            } finally {
-                lock.unlock();
-            }
-            logger.debug("Blacklisted {}", this);
-        }
-
-        /**
-         * Closes resources pointing to the current journal and begins writing
-         * to a new one
-         *
-         * @throws IOException
-         */
-        public void rollover() throws IOException {
-            lock.lock();
-            try {
-                final DataOutputStream out = dataOut;
-                if (out != null) {
-                    out.close();
-                }
-
-                final Path editPath = getNewEditPath();
-                final FileOutputStream fos = new FileOutputStream(editPath.toFile());
-                final DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(fos));
-                outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName());
-                outStream.writeInt(writeAheadLogVersion);
-                outStream.writeUTF(serde.getClass().getName());
-                outStream.writeInt(serde.getVersion());
-                outStream.flush();
-                dataOut = outStream;
-                fileOut = fos;
-
-                currentJournalFilename = editPath.toFile().getName();
-
-                blackListed = false;
-            } finally {
-                lock.unlock();
-            }
-        }
-
-        private long getJournalIndex(final File file) {
-            final String filename = file.getName();
-            final int dotIndex = filename.indexOf(".");
-            final String number = filename.substring(0, dotIndex);
-            return Long.parseLong(number);
-        }
-
-        private Path getNewEditPath() {
-            final List<Path> recoveryPaths = getRecoveryPaths();
-            final long newIndex;
-            if (recoveryPaths == null || recoveryPaths.isEmpty()) {
-                newIndex = 1;
-            } else {
-                final long lastFileIndex = getJournalIndex(recoveryPaths.get(recoveryPaths.size() - 1).toFile());
-                newIndex = lastFileIndex + 1;
-            }
-
-            return editDirectory.resolve(newIndex + JOURNAL_EXTENSION);
-        }
-
-        private List<Path> getRecoveryPaths() {
-            final List<Path> paths = new ArrayList<>();
-
-            final File directory = editDirectory.toFile();
-            final File[] partitionFiles = directory.listFiles();
-            if (partitionFiles == null) {
-                return paths;
-            }
-
-            for (final File file : partitionFiles) {
-                // if file is a journal file but no data has yet been persisted, it may
-                // very well be a 0-byte file (the journal is not SYNC'ed to disk after
-                // a header is written out, so it may be lost). In this case, the journal
-                // is empty, so we can just skip it.
-                if (file.isDirectory() || file.length() == 0L) {
-                    continue;
-                }
-
-                if (!JOURNAL_FILENAME_PATTERN.matcher(file.getName()).matches()) {
-                    continue;
-                }
-
-                if (isJournalFile(file)) {
-                    paths.add(file.toPath());
-                } else {
-                    logger.warn("Found file {}, but could not access it, or it was not in the expected format; will ignore this file", file.getAbsolutePath());
-                }
-            }
-
-            // Sort journal files by the numeric portion of the filename
-            Collections.sort(paths, new Comparator<Path>() {
-                @Override
-                public int compare(final Path o1, final Path o2) {
-                    if (o1 == null && o2 == null) {
-                        return 0;
-                    }
-                    if (o1 == null) {
-                        return 1;
-                    }
-                    if (o2 == null) {
-                        return -1;
-                    }
-
-                    final long index1 = getJournalIndex(o1.toFile());
-                    final long index2 = getJournalIndex(o2.toFile());
-                    return Long.compare(index1, index2);
-                }
-            });
-
-            return paths;
-        }
-
-        void clearOld() {
-            final List<Path> oldRecoveryFiles = getRecoveryPaths();
-
-            for (final Path path : oldRecoveryFiles) {
-                final File file = path.toFile();
-                if (file.getName().equals(currentJournalFilename)) {
-                    continue;
-                }
-                if (file.exists()) {
-                    file.delete();
-                }
-            }
-        }
-
-        private boolean isJournalFile(final File file) {
-            final String expectedStartsWith = MinimalLockingWriteAheadLog.class.getName();
-            try {
-                try (final FileInputStream fis = new FileInputStream(file);
-                        final InputStream bufferedIn = new BufferedInputStream(fis);
-                        final DataInputStream in = new DataInputStream(bufferedIn)) {
-                    final String waliImplClassName = in.readUTF();
-                    if (!expectedStartsWith.equals(waliImplClassName)) {
-                        return false;
-                    }
-                }
-            } catch (final IOException e) {
-                return false;
-            }
-
-            return true;
-        }
-
-        public void update(final Collection<S> records, final long transactionId, final Map<Object, S> recordMap, final boolean forceSync) throws IOException {
-            if (this.closed) {
-                throw new IllegalStateException("Partition is closed");
-            }
-
-            final DataOutputStream out = dataOut;
-            out.writeLong(transactionId);
-
-            final int numEditsToSerialize = records.size();
-            int editsSerialized = 0;
-            for (final S record : records) {
-                final Object recordId = serde.getRecordIdentifier(record);
-                final S previousVersion = recordMap.get(recordId);
-
-                serde.serializeEdit(previousVersion, record, out);
-                if (++editsSerialized < numEditsToSerialize) {
-                    out.write(TRANSACTION_CONTINUE);
-                } else {
-                    out.write(TRANSACTION_COMMIT);
-                }
-            }
-
-            out.flush();
-
-            if (forceSync) {
-                fileOut.getFD().sync();
-            }
-        }
-
-        private DataInputStream createDataInputStream(final Path path) throws IOException {
-            return new DataInputStream(new BufferedInputStream(Files.newInputStream(path)));
-        }
-
-        private DataInputStream getRecoveryStream() throws IOException {
-            if (recoveryIn != null && hasMoreData(recoveryIn)) {
-                return recoveryIn;
-            }
-
-            while (true) {
-                final Path nextRecoveryPath = recoveryFiles.poll();
-                if (nextRecoveryPath == null) {
-                    return null;
-                }
-
-                recoveryIn = createDataInputStream(nextRecoveryPath);
-                if (hasMoreData(recoveryIn)) {
-                    final String waliImplementationClass = recoveryIn.readUTF();
-                    if (!MinimalLockingWriteAheadLog.class.getName().equals(waliImplementationClass)) {
-                        continue;
-                    }
-
-                    final long waliVersion = recoveryIn.readInt();
-                    if (waliVersion > writeAheadLogVersion) {
-                        throw new IOException("Cannot recovery from file " + nextRecoveryPath + " because it was written using WALI version " + waliVersion + ", but the version used to restore it is only " + writeAheadLogVersion);
-                    }
-
-                    @SuppressWarnings("unused")
-                    final String serdeClassName = recoveryIn.readUTF();
-                    this.recoveryVersion = recoveryIn.readInt();
-
-                    break;
-                }
-            }
-
-            return recoveryIn;
-        }
-
-        public Long getNextRecoverableTransactionId() throws IOException {
-            while (true) {
-                DataInputStream recoveryStream = getRecoveryStream();
-                if (recoveryStream == null) {
-                    return null;
-                }
-
-                final long transactionId;
-                try {
-                    transactionId = recoveryIn.readLong();
-                } catch (final EOFException e) {
-                    continue;
-                }
-
-                this.maxTransactionId.set(transactionId);
-                return transactionId;
-            }
-        }
-
-        private boolean hasMoreData(final InputStream in) throws IOException {
-            in.mark(1);
-            final int nextByte = in.read();
-            in.reset();
-            return nextByte >= 0;
-        }
-
-        public void endRecovery() throws IOException {
-            if (recoveryIn != null) {
-                recoveryIn.close();
-            }
-
-            final Path nextRecoveryPath = this.recoveryFiles.poll();
-            if (nextRecoveryPath != null) {
-                throw new IllegalStateException("Signaled to end recovery, but there are more recovery files for Partition in directory " + editDirectory);
-            }
-
-            final Path newEditPath = getNewEditPath();
-
-            final FileOutputStream fos = new FileOutputStream(newEditPath.toFile());
-            final DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(fos));
-            outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName());
-            outStream.writeInt(writeAheadLogVersion);
-            outStream.writeUTF(serde.getClass().getName());
-            outStream.writeInt(serde.getVersion());
-            outStream.flush();
-            dataOut = outStream;
-            fileOut = fos;
-        }
-
-        public Set<Object> recoverNextTransaction(final Map<Object, S> currentRecordMap, final Map<Object, S> updatedRecordMap, final Set<String> swapLocations) throws IOException {
-            final Set<Object> idsRemoved = new HashSet<>();
-
-            int transactionFlag;
-            do {
-                final S record = serde.deserializeEdit(recoveryIn, currentRecordMap, recoveryVersion);
-                if (logger.isTraceEnabled()) {
-                    logger.trace("{} Recovering Transaction {}: {}", new Object[]{this, maxTransactionId.get(), record});
-                }
-
-                final Object recordId = serde.getRecordIdentifier(record);
-                final UpdateType updateType = serde.getUpdateType(record);
-                if (updateType == UpdateType.DELETE) {
-                    updatedRecordMap.remove(recordId);
-                    idsRemoved.add(recordId);
-                } else if (updateType == UpdateType.SWAP_IN) {
-                    final String location = serde.getLocation(record);
-                    if (location == null) {
-                        logger.error("Recovered SWAP_IN record from edit log, but it did not contain a Location; skipping record");
-                    } else {
-                        swapLocations.remove(location);
-                        updatedRecordMap.put(recordId, record);
-                        idsRemoved.remove(recordId);
-                    }
-                } else if (updateType == UpdateType.SWAP_OUT) {
-                    final String location = serde.getLocation(record);
-                    if (location == null) {
-                        logger.error("Recovered SWAP_OUT record from edit log, but it did not contain a Location; skipping record");
-                    } else {
-                        swapLocations.add(location);
-                        updatedRecordMap.remove(recordId);
-                        idsRemoved.add(recordId);
-                    }
-                } else {
-                    updatedRecordMap.put(recordId, record);
-                    idsRemoved.remove(recordId);
-                }
-
-                transactionFlag = recoveryIn.read();
-            } while (transactionFlag != TRANSACTION_COMMIT);
-
-            return idsRemoved;
-        }
-
-        /**
-         * Must be called after recovery has finished
-         *
-         * @return
-         */
-        public long getMaxRecoveredTransactionId() {
-            return maxTransactionId.get();
-        }
-
-        @Override
-        public String toString() {
-            return description;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/wali/src/main/java/org/wali/SerDe.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/main/java/org/wali/SerDe.java b/commons/wali/src/main/java/org/wali/SerDe.java
deleted file mode 100644
index bbc7efb..0000000
--- a/commons/wali/src/main/java/org/wali/SerDe.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wali;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * A mechanism for Serializing and De-Serializing a Record of a given Type
- *
- * @param <T> the type of record that is to be Serialized and De-Serialized by
- * this object
- */
-public interface SerDe<T> {
-
-    /**
-     * <p>
-     * Serializes an Edit Record to the log via the given
-     * {@link DataOutputStream}.
-     * </p>
-     *
-     * @param previousRecordState
-     * @param newRecordState
-     * @param out
-     * @throws IOException
-     */
-    void serializeEdit(T previousRecordState, T newRecordState, DataOutputStream out) throws IOException;
-
-    /**
-     * <p>
-     * Serializes a Record in a form suitable for a Snapshot via the given
-     * {@link DataOutputStream}.
-     * </p>
-     *
-     * @param record
-     * @param out
-     * @throws IOException
-     */
-    void serializeRecord(T record, DataOutputStream out) throws IOException;
-
-    /**
-     * <p>
-     * Reads an Edit Record from the given {@link DataInputStream} and merges
-     * that edit with the current version of the record, returning the new,
-     * merged version. If the Edit Record indicates that the entity was deleted,
-     * must return a Record with an UpdateType of {@link UpdateType#DELETE}.
-     * This method must never return <code>null</code>.
-     * </p>
-     *
-     * @param in
-     * @param currentRecordStates an unmodifiable map of Record ID's to the
-     * current state of that record
-     * @param version the version of the SerDe that was used to serialize the
-     * edit record
-     * @return
-     * @throws IOException
-     */
-    T deserializeEdit(DataInputStream in, Map<Object, T> currentRecordStates, int version) throws IOException;
-
-    /**
-     * <p>
-     * Reads a Record from the given {@link DataInputStream} and returns this
-     * record. If no data is available, returns <code>null</code>.
-     * </p>
-     *
-     * @param in
-     * @param version the version of the SerDe that was used to serialize the
-     * record
-     * @return
-     * @throws IOException
-     */
-    T deserializeRecord(DataInputStream in, int version) throws IOException;
-
-    /**
-     * Returns the unique ID for the given record
-     *
-     * @param record
-     * @return
-     */
-    Object getRecordIdentifier(T record);
-
-    /**
-     * Returns the UpdateType for the given record
-     *
-     * @param record
-     * @return
-     */
-    UpdateType getUpdateType(T record);
-
-    /**
-     * Returns the external location of the given record; this is used when a
-     * record is moved away from WALI or is being re-introduced to WALI. For
-     * example, WALI can be updated with a record of type
-     * {@link UpdateType#SWAP_OUT} that indicates a Location of
-     * file://tmp/external1 and can then be re-introduced to WALI by updating
-     * WALI with a record of type {@link UpdateType#CREATE} that indicates a
-     * Location of file://tmp/external1
-     *
-     * @param record
-     * @return
-     */
-    String getLocation(T record);
-
-    /**
-     * Returns the version that this SerDe will use when writing. This used used
-     * when serializing/deserializing the edit logs so that if the version
-     * changes, we are still able to deserialize old versions
-     *
-     * @return
-     */
-    int getVersion();
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/wali/src/main/java/org/wali/SyncListener.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/main/java/org/wali/SyncListener.java b/commons/wali/src/main/java/org/wali/SyncListener.java
deleted file mode 100644
index ffb11ca..0000000
--- a/commons/wali/src/main/java/org/wali/SyncListener.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wali;
-
-/**
- * <p>
- * Provides a callback mechanism by which applicable listeners can be notified
- * when a WriteAheadRepository is synched (via the
- * {@link WriteAheadRepository#sync()} method) or one of its partitions is
- * synched via
- * {@link WriteAheadRepository#update(java.util.Collection, boolean)} with a
- * value of <code>true</code> for the second argument.
- * </p>
- *
- * <p>
- * It is not required that an implementation of {@link WriteAheadRepository}
- * support this interface. Those that do generally will require that the
- * listener be injected via the constructor.
- * </p>
- *
- * <p>
- * All implementations of this interface must be thread-safe.
- * </p>
- *
- * <p>
- * The {@link #onSync(int)} method will always be called while the associated
- * partition is locked. The {@link #onGlobalSync()} will always be called while
- * the entire repository is locked.
- * </p>
- *
- */
-public interface SyncListener {
-
-    /**
-     * This method is called whenever a specific partition is synched via the
-     * {@link WriteAheadRepository#update(java.util.Collection, boolean)} method
-     *
-     * @param partitionIndex the index of the partition that was synched
-     */
-    void onSync(int partitionIndex);
-
-    /**
-     * This method is called whenever the entire
-     * <code>WriteAheadRepository</code> is synched via the
-     * {@link WriteAheadRepository#sync()} method.
-     */
-    void onGlobalSync();
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/wali/src/main/java/org/wali/UpdateType.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/main/java/org/wali/UpdateType.java b/commons/wali/src/main/java/org/wali/UpdateType.java
deleted file mode 100644
index 1b039f8..0000000
--- a/commons/wali/src/main/java/org/wali/UpdateType.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wali;
-
-/**
- * <p>
- * Enumerates the valid types of things that can cause a
- * {@link WriteAheadRepository} to update its state</p>
- */
-public enum UpdateType {
-
-    /**
-     * Used when a new Record has been created
-     */
-    CREATE,
-    /**
-     * Used when a Record has been updated in some way
-     */
-    UPDATE,
-    /**
-     * Used to indicate that a Record has been deleted and should be removed
-     * from the Repository
-     */
-    DELETE,
-    /**
-     * Used to indicate that a Record still exists but has been moved elsewhere,
-     * so that it is no longer maintained by the WALI instance
-     */
-    SWAP_OUT,
-    /**
-     * Used to indicate that a Record that was previously Swapped Out is now
-     * being Swapped In
-     */
-    SWAP_IN;
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/wali/src/main/java/org/wali/WriteAheadRepository.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/main/java/org/wali/WriteAheadRepository.java b/commons/wali/src/main/java/org/wali/WriteAheadRepository.java
deleted file mode 100644
index 4567872..0000000
--- a/commons/wali/src/main/java/org/wali/WriteAheadRepository.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wali;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Set;
-
-/**
- * <p>
- * A WriteAheadRepository is used to persist state that is otherwise kept
- * in-memory. The Repository does not provide any query capability except to
- * allow the data to be recovered upon restart of the system.
- * </p>
- *
- * <p>
- * A WriteAheadRepository operates by writing every update to an Edit Log. On
- * restart, the data can be recovered by replaying all of the updates that are
- * found in the Edit Log. This can, however, eventually result in very large
- * Edit Logs, which can both take up massive amounts of disk space and take a
- * long time to recover. In order to prevent this, the Repository provides a
- * Checkpointing capability. This allows the current in-memory state of the
- * Repository to be flushed to disk and the Edit Log to be deleted, thereby
- * compacting the amount of space required to store the Repository. After a
- * Checkpoint is performed, modifications are again written to an Edit Log. At
- * this point, when the system is to be restored, it is restored by first
- * loading the Checkpointed version of the Repository and then replaying the
- * Edit Log.
- * </p>
- *
- * <p>
- * All implementations of <code>WriteAheadRepository</code> use one or more
- * partitions to manage their Edit Logs. An implementation may require exactly
- * one partition or may allow many partitions.
- * </p>
- *
- * @param <T>
- */
-public interface WriteAheadRepository<T> {
-
-    /**
-     * <p>
-     * Updates the repository with the specified Records. The Collection must
-     * not contain multiple records with the same ID
-     * </p>
-     *
-     * @param records the records to update
-     * @param forceSync specifies whether or not the Repository forces the data
-     * to be flushed to disk. If false, the data may be stored in Operating
-     * System buffers, which improves performance but could cause loss of data
-     * if power is lost or the Operating System crashes
-     * @throws IOException
-     * @throws IllegalArgumentException if multiple records within the given
-     * Collection have the same ID, as specified by {@link Record#getId()}
-     * method
-     *
-     * @return the index of the Partition that performed the update
-     */
-    int update(Collection<T> records, boolean forceSync) throws IOException;
-
-    /**
-     * <p>
-     * Recovers all records from the persisted state. This method must be called
-     * before any updates are issued to the Repository.
-     * </p>
-     *
-     * @return
-     * @throws IOException
-     * @throws IllegalStateException if any updates have been issued against
-     * this Repository before this method is invoked
-     */
-    Collection<T> recoverRecords() throws IOException;
-
-    /**
-     * <p>
-     * Recovers all External Swap locations that were persisted. If this method
-     * is to be called, it must be called AFTER {@link #recoverRecords()} and
-     * BEFORE {@link update}.
-     * </p>
-     *
-     * @return
-     * @throws IOException
-     */
-    Set<String> getRecoveredSwapLocations() throws IOException;
-
-    /**
-     * <p>
-     * Compacts the contents of the Repository so that rather than having a
-     * Snapshot and an Edit Log indicating many Updates to the Snapshot, the
-     * Snapshot is updated to contain the current state of the Repository, and
-     * the edit log is purged.
-     * </p>
-     *
-     *
-     * @return the number of records that were written to the new snapshot
-     * @throws java.io.IOException
-     */
-    int checkpoint() throws IOException;
-
-    /**
-     * <p>
-     * Causes the repository to checkpoint and then close any open resources.
-     * </p>
-     *
-     * @throws IOException
-     */
-    void shutdown() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/wali/src/test/java/org/wali/DummyRecord.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/test/java/org/wali/DummyRecord.java b/commons/wali/src/test/java/org/wali/DummyRecord.java
deleted file mode 100644
index e0f7f96..0000000
--- a/commons/wali/src/test/java/org/wali/DummyRecord.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wali;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-public class DummyRecord {
-
-    private final String id;
-    private final Map<String, String> props;
-    private final UpdateType updateType;
-
-    public DummyRecord(final String id, final UpdateType updateType) {
-        this.id = id;
-        this.props = new HashMap<>();
-        this.updateType = updateType;
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    public UpdateType getUpdateType() {
-        return updateType;
-    }
-
-    public DummyRecord setProperties(final Map<String, String> props) {
-        this.props.clear();
-        this.props.putAll(props);
-        return this;
-    }
-
-    public DummyRecord setProperty(final String name, final String value) {
-        this.props.put(name, value);
-        return this;
-    }
-
-    public Map<String, String> getProperties() {
-        return Collections.unmodifiableMap(this.props);
-    }
-
-    public String getProperty(final String name) {
-        return props.get(name);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/wali/src/test/java/org/wali/DummyRecordSerde.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/test/java/org/wali/DummyRecordSerde.java b/commons/wali/src/test/java/org/wali/DummyRecordSerde.java
deleted file mode 100644
index 8cc7860..0000000
--- a/commons/wali/src/test/java/org/wali/DummyRecordSerde.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wali;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.Map;
-
-public class DummyRecordSerde implements SerDe<DummyRecord> {
-
-    public static final int NUM_UPDATE_TYPES = UpdateType.values().length;
-    private int throwIOEAfterNserializeEdits = -1;
-    private int serializeEditCount = 0;
-
-    @Override
-    public void serializeEdit(final DummyRecord previousState, final DummyRecord record, final DataOutputStream out) throws IOException {
-        if (throwIOEAfterNserializeEdits >= 0 && (serializeEditCount++ >= throwIOEAfterNserializeEdits)) {
-            throw new IOException("Serialized " + (serializeEditCount - 1) + " records successfully, so now it's time to throw IOE");
-        }
-
-        out.write(record.getUpdateType().ordinal());
-        out.writeUTF(record.getId());
-
-        if (record.getUpdateType() != UpdateType.DELETE) {
-            final Map<String, String> props = record.getProperties();
-            out.writeInt(props.size());
-            for (final Map.Entry<String, String> entry : props.entrySet()) {
-                out.writeUTF(entry.getKey());
-                out.writeUTF(entry.getValue());
-            }
-        }
-    }
-
-    @Override
-    public void serializeRecord(final DummyRecord record, final DataOutputStream out) throws IOException {
-        serializeEdit(null, record, out);
-    }
-
-    @Override
-    public DummyRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
-        final int index = in.read();
-        if (index < 0) {
-            throw new EOFException();
-        }
-        if (index >= NUM_UPDATE_TYPES) {
-            throw new IOException("Corrupt stream; got UpdateType value of " + index + " but there are only " + NUM_UPDATE_TYPES + " valid values");
-        }
-        final UpdateType updateType = UpdateType.values()[index];
-        final String id = in.readUTF();
-        final DummyRecord record = new DummyRecord(id, updateType);
-
-        if (record.getUpdateType() != UpdateType.DELETE) {
-            final int numProps = in.readInt();
-            for (int i = 0; i < numProps; i++) {
-                final String key = in.readUTF();
-                final String value = in.readUTF();
-                record.setProperty(key, value);
-            }
-        }
-        return record;
-    }
-
-    @Override
-    public Object getRecordIdentifier(final DummyRecord record) {
-        return record.getId();
-    }
-
-    @Override
-    public UpdateType getUpdateType(final DummyRecord record) {
-        return record.getUpdateType();
-    }
-
-    @Override
-    public DummyRecord deserializeEdit(final DataInputStream in, final Map<Object, DummyRecord> currentVersion, final int version) throws IOException {
-        return deserializeRecord(in, version);
-    }
-
-    @Override
-    public int getVersion() {
-        return 1;
-    }
-
-    public void setThrowIOEAfterNSerializeEdits(final int n) {
-        this.throwIOEAfterNserializeEdits = n;
-    }
-
-    @Override
-    public String getLocation(final DummyRecord record) {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/wali/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java b/commons/wali/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
deleted file mode 100644
index 57f3495..0000000
--- a/commons/wali/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wali;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestMinimalLockingWriteAheadLog {
-
-    @Test
-    public void testWrite() throws IOException, InterruptedException {
-        final int numPartitions = 8;
-
-        final Path path = Paths.get("target/minimal-locking-repo");
-        deleteRecursively(path.toFile());
-        assertTrue(path.toFile().mkdirs());
-
-        final DummyRecordSerde serde = new DummyRecordSerde();
-        final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
-        final Collection<DummyRecord> initialRecs = repo.recoverRecords();
-        assertTrue(initialRecs.isEmpty());
-
-        final List<InsertThread> threads = new ArrayList<>();
-        for (int i = 0; i < 10; i++) {
-            threads.add(new InsertThread(10000, 1000000 * i, repo));
-        }
-
-        final long start = System.nanoTime();
-        for (final InsertThread thread : threads) {
-            thread.start();
-        }
-        for (final InsertThread thread : threads) {
-            thread.join();
-        }
-        final long nanos = System.nanoTime() - start;
-        final long millis = TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS);
-        System.out.println("Took " + millis + " millis to insert 1,000,000 records each in its own transaction");
-        repo.shutdown();
-
-        final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
-        final Collection<DummyRecord> recoveredRecords = recoverRepo.recoverRecords();
-        assertFalse(recoveredRecords.isEmpty());
-        assertEquals(100000, recoveredRecords.size());
-        for (final DummyRecord record : recoveredRecords) {
-            final Map<String, String> recoveredProps = record.getProperties();
-            assertEquals(1, recoveredProps.size());
-            assertEquals("B", recoveredProps.get("A"));
-        }
-    }
-
-    @Test
-    public void testRecoverAfterIOException() throws IOException {
-        final int numPartitions = 5;
-        final Path path = Paths.get("target/minimal-locking-repo-test-recover-after-ioe");
-        deleteRecursively(path.toFile());
-        Files.createDirectories(path);
-
-        final DummyRecordSerde serde = new DummyRecordSerde();
-        final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
-        final Collection<DummyRecord> initialRecs = repo.recoverRecords();
-        assertTrue(initialRecs.isEmpty());
-
-        serde.setThrowIOEAfterNSerializeEdits(7);   // serialize the 2 transactions, then the first edit of the third transaction; then throw IOException
-
-        final List<DummyRecord> firstTransaction = new ArrayList<>();
-        firstTransaction.add(new DummyRecord("1", UpdateType.CREATE));
-        firstTransaction.add(new DummyRecord("2", UpdateType.CREATE));
-        firstTransaction.add(new DummyRecord("3", UpdateType.CREATE));
-
-        final List<DummyRecord> secondTransaction = new ArrayList<>();
-        secondTransaction.add(new DummyRecord("1", UpdateType.UPDATE).setProperty("abc", "123"));
-        secondTransaction.add(new DummyRecord("2", UpdateType.UPDATE).setProperty("cba", "123"));
-        secondTransaction.add(new DummyRecord("3", UpdateType.UPDATE).setProperty("aaa", "123"));
-
-        final List<DummyRecord> thirdTransaction = new ArrayList<>();
-        thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE));
-        thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE));
-
-        repo.update(firstTransaction, true);
-        repo.update(secondTransaction, true);
-        try {
-            repo.update(thirdTransaction, true);
-            Assert.fail("Did not throw IOException on third transaction");
-        } catch (final IOException e) {
-            // expected behavior.
-        }
-
-        repo.shutdown();
-
-        serde.setThrowIOEAfterNSerializeEdits(-1);
-        final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
-        final Collection<DummyRecord> recoveredRecords = recoverRepo.recoverRecords();
-        assertFalse(recoveredRecords.isEmpty());
-        assertEquals(3, recoveredRecords.size());
-
-        boolean record1 = false, record2 = false, record3 = false;
-        for (final DummyRecord record : recoveredRecords) {
-            switch (record.getId()) {
-                case "1":
-                    record1 = true;
-                    assertEquals("123", record.getProperty("abc"));
-                    break;
-                case "2":
-                    record2 = true;
-                    assertEquals("123", record.getProperty("cba"));
-                    break;
-                case "3":
-                    record3 = true;
-                    assertEquals("123", record.getProperty("aaa"));
-                    break;
-            }
-        }
-
-        assertTrue(record1);
-        assertTrue(record2);
-        assertTrue(record3);
-    }
-
-    @Test
-    public void testCannotModifyLogAfterAllAreBlackListed() throws IOException {
-        final int numPartitions = 5;
-        final Path path = Paths.get("target/minimal-locking-repo-test-cannot-modify-after-all-blacklisted");
-        deleteRecursively(path.toFile());
-        Files.createDirectories(path);
-
-        final DummyRecordSerde serde = new DummyRecordSerde();
-        final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
-        final Collection<DummyRecord> initialRecs = repo.recoverRecords();
-        assertTrue(initialRecs.isEmpty());
-
-        serde.setThrowIOEAfterNSerializeEdits(3);   // serialize the first transaction, then fail on all subsequent transactions
-
-        final List<DummyRecord> firstTransaction = new ArrayList<>();
-        firstTransaction.add(new DummyRecord("1", UpdateType.CREATE));
-        firstTransaction.add(new DummyRecord("2", UpdateType.CREATE));
-        firstTransaction.add(new DummyRecord("3", UpdateType.CREATE));
-
-        final List<DummyRecord> secondTransaction = new ArrayList<>();
-        secondTransaction.add(new DummyRecord("1", UpdateType.UPDATE).setProperty("abc", "123"));
-        secondTransaction.add(new DummyRecord("2", UpdateType.UPDATE).setProperty("cba", "123"));
-        secondTransaction.add(new DummyRecord("3", UpdateType.UPDATE).setProperty("aaa", "123"));
-
-        final List<DummyRecord> thirdTransaction = new ArrayList<>();
-        thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE));
-        thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE));
-
-        repo.update(firstTransaction, true);
-
-        try {
-            repo.update(secondTransaction, true);
-            Assert.fail("Did not throw IOException on second transaction");
-        } catch (final IOException e) {
-            // expected behavior.
-        }
-
-        for (int i = 0; i < 4; i++) {
-            try {
-                repo.update(thirdTransaction, true);
-                Assert.fail("Did not throw IOException on third transaction");
-            } catch (final IOException e) {
-                // expected behavior.
-            }
-        }
-
-        serde.setThrowIOEAfterNSerializeEdits(-1);
-        final List<DummyRecord> fourthTransaction = new ArrayList<>();
-        fourthTransaction.add(new DummyRecord("1", UpdateType.DELETE));
-
-        try {
-            repo.update(fourthTransaction, true);
-            Assert.fail("Successfully updated repo for 4th transaction");
-        } catch (final IOException e) {
-            // expected behavior
-            assertTrue(e.getMessage().contains("All Partitions have been blacklisted"));
-        }
-
-        repo.shutdown();
-        serde.setThrowIOEAfterNSerializeEdits(-1);
-
-        final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
-        final Collection<DummyRecord> recoveredRecords = recoverRepo.recoverRecords();
-        assertFalse(recoveredRecords.isEmpty());
-        assertEquals(3, recoveredRecords.size());
-    }
-
-    @Test
-    public void testStriping() throws IOException {
-        final int numPartitions = 6;
-        final Path path = Paths.get("target/minimal-locking-repo-striped");
-        deleteRecursively(path.toFile());
-        Files.createDirectories(path);
-
-        final SortedSet<Path> paths = new TreeSet<>();
-        paths.add(path.resolve("stripe-1"));
-        paths.add(path.resolve("stripe-2"));
-
-        final DummyRecordSerde serde = new DummyRecordSerde();
-        final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(paths, numPartitions, serde, null);
-        final Collection<DummyRecord> initialRecs = repo.recoverRecords();
-        assertTrue(initialRecs.isEmpty());
-
-        final InsertThread inserter = new InsertThread(100000, 0, repo);
-        inserter.run();
-
-        for (final Path partitionPath : paths) {
-            final File[] files = partitionPath.toFile().listFiles(new FileFilter() {
-                @Override
-                public boolean accept(File pathname) {
-                    return pathname.getName().startsWith("partition");
-                }
-            });
-            assertEquals(3, files.length);
-
-            for (final File file : files) {
-                final File[] journalFiles = file.listFiles();
-                assertEquals(1, journalFiles.length);
-            }
-        }
-
-        repo.checkpoint();
-
-    }
-
-    private static class InsertThread extends Thread {
-
-        private final List<List<DummyRecord>> records;
-        private final WriteAheadRepository<DummyRecord> repo;
-
-        public InsertThread(final int numInsertions, final int startIndex, final WriteAheadRepository<DummyRecord> repo) {
-            records = new ArrayList<>();
-            for (int i = 0; i < numInsertions; i++) {
-                final DummyRecord record = new DummyRecord(String.valueOf(i + startIndex), UpdateType.CREATE);
-                record.setProperty("A", "B");
-                final List<DummyRecord> list = new ArrayList<>();
-                list.add(record);
-                records.add(list);
-            }
-            this.repo = repo;
-        }
-
-        @Override
-        public void run() {
-            try {
-                int counter = 0;
-                for (final List<DummyRecord> list : records) {
-                    final boolean forceSync = (++counter == records.size());
-                    repo.update(list, forceSync);
-                }
-            } catch (IOException e) {
-                Assert.fail("Failed to update: " + e.toString());
-                e.printStackTrace();
-            }
-        }
-    }
-
-    private void deleteRecursively(final File file) {
-        final File[] children = file.listFiles();
-        if (children != null) {
-            for (final File child : children) {
-                deleteRecursively(child);
-            }
-        }
-
-        file.delete();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/maven-plugins/nar-maven-plugin/pom.xml
----------------------------------------------------------------------
diff --git a/maven-plugins/nar-maven-plugin/pom.xml b/maven-plugins/nar-maven-plugin/pom.xml
new file mode 100644
index 0000000..c0236fd
--- /dev/null
+++ b/maven-plugins/nar-maven-plugin/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>maven-plugins</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <artifactId>nar-maven-plugin</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <packaging>maven-plugin</packaging>
+    <name>Apache NiFi NAR Plugin</name>
+    <description>Apache NiFi Nar Plugin. It is currently a part of the Apache Incubator.</description>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-plugin-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>default-descriptor</id>
+                        <goals>
+                            <goal>descriptor</goal>
+                        </goals>
+                        <phase>process-classes</phase>
+                    </execution>
+                    <execution>
+                        <id>help-descriptor</id>
+                        <goals>
+                            <goal>helpmojo</goal>
+                        </goals>
+                        <phase>process-classes</phase>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+    <dependencies>                    
+        <dependency>          
+            <groupId>org.apache.maven</groupId>
+            <artifactId>maven-plugin-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-dependency-plugin</artifactId>
+            <type>maven-plugin</type>
+        </dependency>
+        <dependency>
+            <!-- No code from maven-jar-plugin is actually used; it's included
+            just to simplify the dependencies list.                     -->
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-jar-plugin</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.maven.plugin-tools</groupId>
+            <artifactId>maven-plugin-annotations</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>