You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/19 19:16:09 UTC
[35/59] [abbrv] [partial] incubator-nifi git commit: Reworked overall
directory structure to make releasing nifi vs maven plugis easier
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/wali/pom.xml
----------------------------------------------------------------------
diff --git a/commons/wali/pom.xml b/commons/wali/pom.xml
deleted file mode 100644
index 347c8cc..0000000
--- a/commons/wali/pom.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<?xml version="1.0"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-commons-parent</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </parent>
-
- <artifactId>wali</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <packaging>jar</packaging>
-
- <name>WALI : Write-Ahead Log Implementation</name>
-
- <dependencies>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-utils</artifactId>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/wali/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/commons/wali/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
deleted file mode 100644
index 19208d3..0000000
--- a/commons/wali/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
+++ /dev/null
@@ -1,1008 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wali;
-
-import static java.util.Objects.requireNonNull;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.channels.FileChannel;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <p>
- * This implementation provides as little Locking as possible in order to
- * provide the highest throughput possible. However, this implementation is ONLY
- * appropriate if it can be guaranteed that only a single thread will ever issue
- * updates for a given Record at any one time.
- * </p>
- *
- * @param <T>
- */
-public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepository<T> {
-
- private final Path basePath;
- private final Path partialPath;
- private final Path snapshotPath;
-
- private final SerDe<T> serde;
- private final SyncListener syncListener;
- private final FileChannel lockChannel;
- private final AtomicLong transactionIdGenerator = new AtomicLong(0L);
-
- private final Partition<T>[] partitions;
- private final AtomicLong partitionIndex = new AtomicLong(0L);
- private final ConcurrentMap<Object, T> recordMap = new ConcurrentHashMap<>();
- private final Map<Object, T> unmodifiableRecordMap = Collections.unmodifiableMap(recordMap);
- private final Set<String> externalLocations = new CopyOnWriteArraySet<>();
-
- private final Set<String> recoveredExternalLocations = new CopyOnWriteArraySet<>();
-
- private final AtomicInteger numberBlackListedPartitions = new AtomicInteger(0);
-
- private static final Logger logger = LoggerFactory.getLogger(MinimalLockingWriteAheadLog.class);
-
- private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
- private final Lock readLock = rwLock.readLock(); // required to update a partition
- private final Lock writeLock = rwLock.writeLock(); // required for checkpoint
-
- private volatile boolean updated = false;
- private volatile boolean recovered = false;
-
- public MinimalLockingWriteAheadLog(final Path path, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException {
- this(new TreeSet<>(Collections.singleton(path)), partitionCount, serde, syncListener);
- }
-
- /**
- *
- * @param paths a sorted set of Paths to use for the partitions/journals and
- * the snapshot. The snapshot will always be written to the first path
- * specified.
- *
- * @param partitionCount the number of partitions/journals to use. For best
- * performance, this should be close to the number of threads that are
- * expected to update the repository simultaneously
- *
- * @param serde
- * @param syncListener
- * @throws IOException
- */
- @SuppressWarnings("unchecked")
- public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException {
- this.syncListener = syncListener;
-
- requireNonNull(paths);
- requireNonNull(serde);
-
- if (paths.isEmpty()) {
- throw new IllegalArgumentException("Paths must be non-empty");
- }
-
- int existingPartitions = 0;
- for (final Path path : paths) {
- if (!Files.exists(path)) {
- Files.createDirectories(path);
- }
-
- final File file = path.toFile();
- if (!file.isDirectory()) {
- throw new IOException("Path given [" + path + "] is not a directory");
- }
- if (!file.canWrite()) {
- throw new IOException("Path given [" + path + "] is not writable");
- }
- if (!file.canRead()) {
- throw new IOException("Path given [" + path + "] is not readable");
- }
- if (!file.canExecute()) {
- throw new IOException("Path given [" + path + "] is not executable");
- }
-
- final File[] children = file.listFiles();
- if (children != null) {
- for (final File child : children) {
- if (child.isDirectory() && child.getName().startsWith("partition-")) {
- existingPartitions++;
- }
- }
-
- if (existingPartitions != 0 && existingPartitions != partitionCount) {
- logger.warn("Constructing MinimalLockingWriteAheadLog with partitionCount={}, but the repository currently has "
- + "{} partitions; ignoring argument and proceeding with {} partitions",
- new Object[]{partitionCount, existingPartitions, existingPartitions});
- }
- }
- }
-
- this.basePath = paths.iterator().next();
- this.partialPath = basePath.resolve("snapshot.partial");
- this.snapshotPath = basePath.resolve("snapshot");
- this.serde = serde;
-
- final Path lockPath = basePath.resolve("wali.lock");
- lockChannel = new FileOutputStream(lockPath.toFile()).getChannel();
- lockChannel.lock();
-
- partitions = new Partition[partitionCount];
-
- Iterator<Path> pathIterator = paths.iterator();
- for (int i = 0; i < partitionCount; i++) {
- // If we're out of paths, create a new iterator to start over.
- if (!pathIterator.hasNext()) {
- pathIterator = paths.iterator();
- }
-
- final Path partitionBasePath = pathIterator.next();
-
- partitions[i] = new Partition<>(partitionBasePath.resolve("partition-" + i), serde, i, getVersion());
- }
- }
-
- @Override
- public int update(final Collection<T> records, final boolean forceSync) throws IOException {
- if (!recovered) {
- throw new IllegalStateException("Cannot update repository until record recovery has been performed");
- }
-
- if (records.isEmpty()) {
- return -1;
- }
-
- updated = true;
- readLock.lock();
- try {
- while (true) {
- final int numBlackListed = numberBlackListedPartitions.get();
- if (numBlackListed >= partitions.length) {
- throw new IOException("All Partitions have been blacklisted due to failures when attempting to update. If the Write-Ahead Log is able to perform a checkpoint, this issue may resolve itself. Otherwise, manual intervention will be required.");
- }
-
- final long partitionIdx = partitionIndex.getAndIncrement();
- final int resolvedIdx = (int) (partitionIdx % partitions.length);
- final Partition<T> partition = partitions[resolvedIdx];
- if (partition.tryClaim()) {
- try {
- final long transactionId = transactionIdGenerator.getAndIncrement();
- if (logger.isTraceEnabled()) {
- for (final T record : records) {
- logger.trace("Partition {} performing Transaction {}: {}", new Object[]{partition, transactionId, record});
- }
- }
-
- try {
- partition.update(records, transactionId, unmodifiableRecordMap, forceSync);
- } catch (final Exception e) {
- partition.blackList();
- numberBlackListedPartitions.incrementAndGet();
- throw e;
- }
-
- if (forceSync && syncListener != null) {
- syncListener.onSync(resolvedIdx);
- }
- } finally {
- partition.releaseClaim();
- }
-
- for (final T record : records) {
- final UpdateType updateType = serde.getUpdateType(record);
- final Object recordIdentifier = serde.getRecordIdentifier(record);
-
- if (updateType == UpdateType.DELETE) {
- recordMap.remove(recordIdentifier);
- } else if (updateType == UpdateType.SWAP_OUT) {
- final String newLocation = serde.getLocation(record);
- if (newLocation == null) {
- logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_OUT but no indicator of where the Record is to be Swapped Out to; these records may be lost when the repository is restored!");
- } else {
- recordMap.remove(recordIdentifier);
- this.externalLocations.add(newLocation);
- }
- } else if (updateType == UpdateType.SWAP_IN) {
- final String newLocation = serde.getLocation(record);
- if (newLocation == null) {
- logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_IN but no indicator of where the Record is to be Swapped In from; these records may be duplicated when the repository is restored!");
- } else {
- externalLocations.remove(newLocation);
- }
- recordMap.put(recordIdentifier, record);
- } else {
- recordMap.put(recordIdentifier, record);
- }
- }
-
- return resolvedIdx;
- }
- }
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public Collection<T> recoverRecords() throws IOException {
- if (updated) {
- throw new IllegalStateException("Cannot recover records after updating the repository; must call recoverRecords first");
- }
-
- final long recoverStart = System.nanoTime();
- writeLock.lock();
- try {
- Long maxTransactionId = recoverFromSnapshot(recordMap);
- recoverFromEdits(recordMap, maxTransactionId);
-
- for (final Partition<T> partition : partitions) {
- final long transId = partition.getMaxRecoveredTransactionId();
- if (maxTransactionId == null || transId > maxTransactionId) {
- maxTransactionId = transId;
- }
- }
-
- this.transactionIdGenerator.set(maxTransactionId + 1);
- this.externalLocations.addAll(recoveredExternalLocations);
- logger.info("{} finished recovering records. Performing Checkpoint to ensure proper state of Partitions before updates", this);
- } finally {
- writeLock.unlock();
- }
- final long recoverNanos = System.nanoTime() - recoverStart;
- final long recoveryMillis = TimeUnit.MILLISECONDS.convert(recoverNanos, TimeUnit.NANOSECONDS);
- logger.info("Successfully recovered {} records in {} milliseconds", recordMap.size(), recoveryMillis);
- checkpoint();
-
- recovered = true;
- return recordMap.values();
- }
-
- @Override
- public Set<String> getRecoveredSwapLocations() throws IOException {
- return recoveredExternalLocations;
- }
-
- private Long recoverFromSnapshot(final Map<Object, T> recordMap) throws IOException {
- final boolean partialExists = Files.exists(partialPath);
- final boolean snapshotExists = Files.exists(snapshotPath);
-
- if (!partialExists && !snapshotExists) {
- return null;
- }
-
- if (partialExists && snapshotExists) {
- // both files exist -- assume we failed while checkpointing. Delete
- // the partial file
- Files.delete(partialPath);
- } else if (partialExists) {
- // partial exists but snapshot does not -- we must have completed
- // creating the partial, deleted the snapshot
- // but crashed before renaming the partial to the snapshot. Just
- // rename partial to snapshot
- Files.move(partialPath, snapshotPath);
- }
-
- if (Files.size(snapshotPath) == 0) {
- logger.warn("{} Found 0-byte Snapshot file; skipping Snapshot file in recovery", this);
- return null;
- }
-
- // at this point, we know the snapshotPath exists because if it didn't, then we either returned null
- // or we renamed partialPath to snapshotPath. So just Recover from snapshotPath.
- try (final DataInputStream dataIn = new DataInputStream(new BufferedInputStream(Files.newInputStream(snapshotPath, StandardOpenOption.READ)))) {
- final String waliImplementationClass = dataIn.readUTF();
- final int waliImplementationVersion = dataIn.readInt();
-
- if (!waliImplementationClass.equals(MinimalLockingWriteAheadLog.class.getName())) {
- throw new IOException("Write-Ahead Log located at " + snapshotPath + " was written using the " + waliImplementationClass + " class; cannot restore using " + getClass().getName());
- }
-
- if (waliImplementationVersion > getVersion()) {
- throw new IOException("Write-Ahead Log located at " + snapshotPath + " was written using version " + waliImplementationVersion + " of the " + waliImplementationClass + " class; cannot restore using Version " + getVersion());
- }
-
- dataIn.readUTF(); // ignore serde class name for now
- final int serdeVersion = dataIn.readInt();
- final long maxTransactionId = dataIn.readLong();
- final int numRecords = dataIn.readInt();
-
- for (int i = 0; i < numRecords; i++) {
- final T record = serde.deserializeRecord(dataIn, serdeVersion);
- if (record == null) {
- throw new EOFException();
- }
-
- final UpdateType updateType = serde.getUpdateType(record);
- if (updateType == UpdateType.DELETE) {
- logger.warn("While recovering from snapshot, found record with type 'DELETE'; this record will not be restored");
- continue;
- }
-
- logger.trace("Recovered from snapshot: {}", record);
- recordMap.put(serde.getRecordIdentifier(record), record);
- }
-
- final int numSwapRecords = dataIn.readInt();
- final Set<String> swapLocations = new HashSet<>();
- for (int i = 0; i < numSwapRecords; i++) {
- swapLocations.add(dataIn.readUTF());
- }
- this.recoveredExternalLocations.addAll(swapLocations);
-
- logger.debug("{} restored {} Records and {} Swap Files from Snapshot, ending with Transaction ID {}", new Object[]{this, numRecords, recoveredExternalLocations.size(), maxTransactionId});
- return maxTransactionId;
- }
- }
-
- /**
- * Recovers records from the edit logs via the Partitions. Returns a boolean
- * if recovery of a Partition requires the Write-Ahead Log be checkpointed
- * before modification.
- *
- * @param modifiableRecordMap
- * @param maxTransactionIdRestored
- * @return
- * @throws IOException
- */
- private void recoverFromEdits(final Map<Object, T> modifiableRecordMap, final Long maxTransactionIdRestored) throws IOException {
- final Map<Object, T> updateMap = new HashMap<>();
- final Map<Object, T> unmodifiableRecordMap = Collections.unmodifiableMap(modifiableRecordMap);
- final Map<Object, T> ignorableMap = new HashMap<>();
- final Set<String> ignorableSwapLocations = new HashSet<>();
-
- // populate a map of the next transaction id for each partition to the
- // partition that has that next transaction id.
- final SortedMap<Long, Partition<T>> transactionMap = new TreeMap<>();
- for (final Partition<T> partition : partitions) {
- Long transactionId;
- boolean keepTransaction;
- do {
- transactionId = partition.getNextRecoverableTransactionId();
-
- keepTransaction = transactionId == null || maxTransactionIdRestored == null || transactionId > maxTransactionIdRestored;
- if (keepTransaction && transactionId != null) {
- // map this transaction id to its partition so that we can
- // start restoring transactions from this partition,
- // starting at 'transactionId'
- transactionMap.put(transactionId, partition);
- } else if (transactionId != null) {
- // skip the next transaction, because our snapshot already
- // contained this transaction.
- try {
- partition.recoverNextTransaction(ignorableMap, updateMap, ignorableSwapLocations);
- } catch (final EOFException e) {
- logger.error("{} unexpectedly reached End of File while reading from {} for Transaction {}; assuming crash and ignoring this transaction.",
- new Object[]{this, partition, transactionId});
- }
- }
- } while (!keepTransaction);
- }
-
- while (!transactionMap.isEmpty()) {
- final Map.Entry<Long, Partition<T>> firstEntry = transactionMap.entrySet().iterator().next();
- final Long firstTransactionId = firstEntry.getKey();
- final Partition<T> nextPartition = firstEntry.getValue();
-
- try {
- updateMap.clear();
- final Set<Object> idsRemoved = nextPartition.recoverNextTransaction(unmodifiableRecordMap, updateMap, recoveredExternalLocations);
- modifiableRecordMap.putAll(updateMap);
- for (final Object id : idsRemoved) {
- modifiableRecordMap.remove(id);
- }
- } catch (final EOFException e) {
- logger.error("{} unexpectedly reached End-of-File when reading from {} for Transaction ID {}; assuming crash and ignoring this transaction",
- new Object[]{this, nextPartition, firstTransactionId});
- }
-
- transactionMap.remove(firstTransactionId);
-
- Long subsequentTransactionId = null;
- try {
- subsequentTransactionId = nextPartition.getNextRecoverableTransactionId();
- } catch (final IOException e) {
- logger.error("{} unexpectedly found End-of-File when reading from {} for Transaction ID {}; assuming crash and ignoring this transaction",
- new Object[]{this, nextPartition, firstTransactionId});
- }
-
- if (subsequentTransactionId != null) {
- transactionMap.put(subsequentTransactionId, nextPartition);
- }
- }
-
- for (final Partition<T> partition : partitions) {
- partition.endRecovery();
- }
- }
-
- @Override
- public synchronized int checkpoint() throws IOException {
- final Set<T> records;
- final Set<String> swapLocations;
- final long maxTransactionId;
-
- final long startNanos = System.nanoTime();
-
- FileOutputStream fileOut = null;
- DataOutputStream dataOut = null;
-
- long stopTheWorldNanos = -1L;
- long stopTheWorldStart = -1L;
- try {
- writeLock.lock();
- try {
- stopTheWorldStart = System.nanoTime();
- // stop the world while we make a copy of the records that must
- // be checkpointed and rollover the partitions.
- // We copy the records because serializing them is potentially
- // very expensive, especially when we have hundreds
- // of thousands or even millions of them. We don't want to
- // prevent WALI from being used during this time.
-
- // So the design is to copy all of the records, determine the
- // last transaction ID that the records represent,
- // and roll over the partitions to new write-ahead logs.
- // Then, outside of the write lock, we will serialize the data
- // to disk, and then remove the old Partition data.
- records = new HashSet<>(recordMap.values());
- maxTransactionId = transactionIdGenerator.get() - 1;
-
- swapLocations = new HashSet<>(externalLocations);
- for (final Partition<T> partition : partitions) {
- partition.rollover();
- }
-
- // notify global sync with the write lock held. We do this because we don't want the repository to get updated
- // while the listener is performing its necessary tasks
- if (syncListener != null) {
- syncListener.onGlobalSync();
- }
- } finally {
- writeLock.unlock();
- }
-
- stopTheWorldNanos = System.nanoTime() - stopTheWorldStart;
-
- // perform checkpoint, writing to .partial file
- fileOut = new FileOutputStream(partialPath.toFile());
- dataOut = new DataOutputStream(fileOut);
- dataOut.writeUTF(MinimalLockingWriteAheadLog.class.getName());
- dataOut.writeInt(getVersion());
- dataOut.writeUTF(serde.getClass().getName());
- dataOut.writeInt(serde.getVersion());
- dataOut.writeLong(maxTransactionId);
- dataOut.writeInt(records.size());
-
- for (final T record : records) {
- logger.trace("Checkpointing {}", record);
- serde.serializeRecord(record, dataOut);
- }
-
- dataOut.writeInt(swapLocations.size());
- for (final String swapLocation : swapLocations) {
- dataOut.writeUTF(swapLocation);
- }
- } finally {
- if (dataOut != null) {
- try {
- dataOut.flush();
- fileOut.getFD().sync();
- dataOut.close();
- } catch (final IOException e) {
- logger.warn("Failed to close Data Stream due to {}", e.toString(), e);
- }
- }
- }
-
- // delete the snapshot, if it exists, and rename the .partial to
- // snapshot
- Files.deleteIfExists(snapshotPath);
- Files.move(partialPath, snapshotPath);
-
- // clear all of the edit logs
- final long partitionStart = System.nanoTime();
- for (final Partition<T> partition : partitions) {
- // we can call clearOld without claiming the partition because it
- // does not change the partition's state
- // and the only member variable it touches cannot be modified, other
- // than when #rollover() is called.
- // And since this method is the only one that calls #rollover() and
- // this method is synchronized,
- // the value of that member variable will not change. And it's
- // volatile, so we will get the correct value.
- partition.clearOld();
- }
- final long partitionEnd = System.nanoTime();
- numberBlackListedPartitions.set(0);
-
- final long endNanos = System.nanoTime();
- final long millis = TimeUnit.MILLISECONDS.convert(endNanos - startNanos, TimeUnit.NANOSECONDS);
- final long partitionMillis = TimeUnit.MILLISECONDS.convert(partitionEnd - partitionStart, TimeUnit.NANOSECONDS);
- final long stopTheWorldMillis = TimeUnit.NANOSECONDS.toMillis(stopTheWorldNanos);
-
- logger.info("{} checkpointed with {} Records and {} Swap Files in {} milliseconds (Stop-the-world time = {} milliseconds, Clear Edit Logs time = {} millis), max Transaction ID {}",
- new Object[]{this, records.size(), swapLocations.size(), millis, stopTheWorldMillis, partitionMillis, maxTransactionId});
-
- return records.size();
- }
-
- @Override
- public void shutdown() throws IOException {
- writeLock.lock();
- try {
- for (final Partition<T> partition : partitions) {
- partition.close();
- }
- } finally {
- writeLock.unlock();
- lockChannel.close();
- }
- }
-
- public int getVersion() {
- return 1;
- }
-
- /**
- * Represents a partition of this repository, which maps directly to a
- * .journal file.
- *
- * All methods with the exceptions of {@link #claim()}, {@link #tryClaim()},
- * and {@link #releaseClaim()} in this Partition MUST be called while
- * holding the claim (via {@link #claim} or {@link #tryClaim()).
- *
- * @param <S>
- */
- private static class Partition<S> {
-
- public static final String JOURNAL_EXTENSION = ".journal";
- private static final Pattern JOURNAL_FILENAME_PATTERN = Pattern.compile("\\d+\\.journal");
-
- private final SerDe<S> serde;
-
- private final Path editDirectory;
- private final int writeAheadLogVersion;
-
- private final Lock lock = new ReentrantLock();
- private DataOutputStream dataOut = null;
- private FileOutputStream fileOut = null;
- private boolean blackListed = false;
- private boolean closed = false;
- private DataInputStream recoveryIn;
- private int recoveryVersion;
- private String currentJournalFilename = "";
-
- private static final byte TRANSACTION_CONTINUE = 1;
- private static final byte TRANSACTION_COMMIT = 2;
-
- private final String description;
- private final AtomicLong maxTransactionId = new AtomicLong(-1L);
- private final Logger logger = LoggerFactory.getLogger(MinimalLockingWriteAheadLog.class);
-
- private final Queue<Path> recoveryFiles;
-
- public Partition(final Path path, final SerDe<S> serde, final int partitionIndex, final int writeAheadLogVersion) throws IOException {
- this.editDirectory = path;
- this.serde = serde;
-
- final File file = path.toFile();
- if (!file.exists() && !file.mkdirs()) {
- throw new IOException("Could not create directory " + file.getAbsolutePath());
- }
-
- this.recoveryFiles = new LinkedBlockingQueue<>();
- for (final Path recoveryPath : getRecoveryPaths()) {
- recoveryFiles.add(recoveryPath);
- }
-
- this.description = "Partition-" + partitionIndex;
- this.writeAheadLogVersion = writeAheadLogVersion;
- }
-
- public boolean tryClaim() {
- final boolean obtainedLock = lock.tryLock();
- if (!obtainedLock) {
- return false;
- }
-
- // Check if the partition is blacklisted. If so, unlock it and return false. Otherwise,
- // leave it locked and return true, so that the caller will need to unlock.
- if (blackListed) {
- lock.unlock();
- return false;
- }
-
- return true;
- }
-
- public void releaseClaim() {
- lock.unlock();
- }
-
- public void close() {
- final DataOutputStream out = dataOut;
- if (out != null) {
- try {
- out.close();
- } catch (final Exception e) {
-
- }
- }
-
- this.closed = true;
- this.dataOut = null;
- }
-
- public void blackList() {
- lock.lock();
- try {
- blackListed = true;
- } finally {
- lock.unlock();
- }
- logger.debug("Blacklisted {}", this);
- }
-
- /**
- * Closes resources pointing to the current journal and begins writing
- * to a new one
- *
- * @throws IOException
- */
- public void rollover() throws IOException {
- lock.lock();
- try {
- final DataOutputStream out = dataOut;
- if (out != null) {
- out.close();
- }
-
- final Path editPath = getNewEditPath();
- final FileOutputStream fos = new FileOutputStream(editPath.toFile());
- final DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(fos));
- outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName());
- outStream.writeInt(writeAheadLogVersion);
- outStream.writeUTF(serde.getClass().getName());
- outStream.writeInt(serde.getVersion());
- outStream.flush();
- dataOut = outStream;
- fileOut = fos;
-
- currentJournalFilename = editPath.toFile().getName();
-
- blackListed = false;
- } finally {
- lock.unlock();
- }
- }
-
- private long getJournalIndex(final File file) {
- final String filename = file.getName();
- final int dotIndex = filename.indexOf(".");
- final String number = filename.substring(0, dotIndex);
- return Long.parseLong(number);
- }
-
- private Path getNewEditPath() {
- final List<Path> recoveryPaths = getRecoveryPaths();
- final long newIndex;
- if (recoveryPaths == null || recoveryPaths.isEmpty()) {
- newIndex = 1;
- } else {
- final long lastFileIndex = getJournalIndex(recoveryPaths.get(recoveryPaths.size() - 1).toFile());
- newIndex = lastFileIndex + 1;
- }
-
- return editDirectory.resolve(newIndex + JOURNAL_EXTENSION);
- }
-
- private List<Path> getRecoveryPaths() {
- final List<Path> paths = new ArrayList<>();
-
- final File directory = editDirectory.toFile();
- final File[] partitionFiles = directory.listFiles();
- if (partitionFiles == null) {
- return paths;
- }
-
- for (final File file : partitionFiles) {
- // if file is a journal file but no data has yet been persisted, it may
- // very well be a 0-byte file (the journal is not SYNC'ed to disk after
- // a header is written out, so it may be lost). In this case, the journal
- // is empty, so we can just skip it.
- if (file.isDirectory() || file.length() == 0L) {
- continue;
- }
-
- if (!JOURNAL_FILENAME_PATTERN.matcher(file.getName()).matches()) {
- continue;
- }
-
- if (isJournalFile(file)) {
- paths.add(file.toPath());
- } else {
- logger.warn("Found file {}, but could not access it, or it was not in the expected format; will ignore this file", file.getAbsolutePath());
- }
- }
-
- // Sort journal files by the numeric portion of the filename
- Collections.sort(paths, new Comparator<Path>() {
- @Override
- public int compare(final Path o1, final Path o2) {
- if (o1 == null && o2 == null) {
- return 0;
- }
- if (o1 == null) {
- return 1;
- }
- if (o2 == null) {
- return -1;
- }
-
- final long index1 = getJournalIndex(o1.toFile());
- final long index2 = getJournalIndex(o2.toFile());
- return Long.compare(index1, index2);
- }
- });
-
- return paths;
- }
-
- void clearOld() {
- final List<Path> oldRecoveryFiles = getRecoveryPaths();
-
- for (final Path path : oldRecoveryFiles) {
- final File file = path.toFile();
- if (file.getName().equals(currentJournalFilename)) {
- continue;
- }
- if (file.exists()) {
- file.delete();
- }
- }
- }
-
- private boolean isJournalFile(final File file) {
- final String expectedStartsWith = MinimalLockingWriteAheadLog.class.getName();
- try {
- try (final FileInputStream fis = new FileInputStream(file);
- final InputStream bufferedIn = new BufferedInputStream(fis);
- final DataInputStream in = new DataInputStream(bufferedIn)) {
- final String waliImplClassName = in.readUTF();
- if (!expectedStartsWith.equals(waliImplClassName)) {
- return false;
- }
- }
- } catch (final IOException e) {
- return false;
- }
-
- return true;
- }
-
- public void update(final Collection<S> records, final long transactionId, final Map<Object, S> recordMap, final boolean forceSync) throws IOException {
- if (this.closed) {
- throw new IllegalStateException("Partition is closed");
- }
-
- final DataOutputStream out = dataOut;
- out.writeLong(transactionId);
-
- final int numEditsToSerialize = records.size();
- int editsSerialized = 0;
- for (final S record : records) {
- final Object recordId = serde.getRecordIdentifier(record);
- final S previousVersion = recordMap.get(recordId);
-
- serde.serializeEdit(previousVersion, record, out);
- if (++editsSerialized < numEditsToSerialize) {
- out.write(TRANSACTION_CONTINUE);
- } else {
- out.write(TRANSACTION_COMMIT);
- }
- }
-
- out.flush();
-
- if (forceSync) {
- fileOut.getFD().sync();
- }
- }
-
- private DataInputStream createDataInputStream(final Path path) throws IOException {
- return new DataInputStream(new BufferedInputStream(Files.newInputStream(path)));
- }
-
- private DataInputStream getRecoveryStream() throws IOException {
- if (recoveryIn != null && hasMoreData(recoveryIn)) {
- return recoveryIn;
- }
-
- while (true) {
- final Path nextRecoveryPath = recoveryFiles.poll();
- if (nextRecoveryPath == null) {
- return null;
- }
-
- recoveryIn = createDataInputStream(nextRecoveryPath);
- if (hasMoreData(recoveryIn)) {
- final String waliImplementationClass = recoveryIn.readUTF();
- if (!MinimalLockingWriteAheadLog.class.getName().equals(waliImplementationClass)) {
- continue;
- }
-
- final long waliVersion = recoveryIn.readInt();
- if (waliVersion > writeAheadLogVersion) {
- throw new IOException("Cannot recovery from file " + nextRecoveryPath + " because it was written using WALI version " + waliVersion + ", but the version used to restore it is only " + writeAheadLogVersion);
- }
-
- @SuppressWarnings("unused")
- final String serdeClassName = recoveryIn.readUTF();
- this.recoveryVersion = recoveryIn.readInt();
-
- break;
- }
- }
-
- return recoveryIn;
- }
-
- public Long getNextRecoverableTransactionId() throws IOException {
- while (true) {
- DataInputStream recoveryStream = getRecoveryStream();
- if (recoveryStream == null) {
- return null;
- }
-
- final long transactionId;
- try {
- transactionId = recoveryIn.readLong();
- } catch (final EOFException e) {
- continue;
- }
-
- this.maxTransactionId.set(transactionId);
- return transactionId;
- }
- }
-
- private boolean hasMoreData(final InputStream in) throws IOException {
- in.mark(1);
- final int nextByte = in.read();
- in.reset();
- return nextByte >= 0;
- }
-
- public void endRecovery() throws IOException {
- if (recoveryIn != null) {
- recoveryIn.close();
- }
-
- final Path nextRecoveryPath = this.recoveryFiles.poll();
- if (nextRecoveryPath != null) {
- throw new IllegalStateException("Signaled to end recovery, but there are more recovery files for Partition in directory " + editDirectory);
- }
-
- final Path newEditPath = getNewEditPath();
-
- final FileOutputStream fos = new FileOutputStream(newEditPath.toFile());
- final DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(fos));
- outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName());
- outStream.writeInt(writeAheadLogVersion);
- outStream.writeUTF(serde.getClass().getName());
- outStream.writeInt(serde.getVersion());
- outStream.flush();
- dataOut = outStream;
- fileOut = fos;
- }
-
- public Set<Object> recoverNextTransaction(final Map<Object, S> currentRecordMap, final Map<Object, S> updatedRecordMap, final Set<String> swapLocations) throws IOException {
- final Set<Object> idsRemoved = new HashSet<>();
-
- int transactionFlag;
- do {
- final S record = serde.deserializeEdit(recoveryIn, currentRecordMap, recoveryVersion);
- if (logger.isTraceEnabled()) {
- logger.trace("{} Recovering Transaction {}: {}", new Object[]{this, maxTransactionId.get(), record});
- }
-
- final Object recordId = serde.getRecordIdentifier(record);
- final UpdateType updateType = serde.getUpdateType(record);
- if (updateType == UpdateType.DELETE) {
- updatedRecordMap.remove(recordId);
- idsRemoved.add(recordId);
- } else if (updateType == UpdateType.SWAP_IN) {
- final String location = serde.getLocation(record);
- if (location == null) {
- logger.error("Recovered SWAP_IN record from edit log, but it did not contain a Location; skipping record");
- } else {
- swapLocations.remove(location);
- updatedRecordMap.put(recordId, record);
- idsRemoved.remove(recordId);
- }
- } else if (updateType == UpdateType.SWAP_OUT) {
- final String location = serde.getLocation(record);
- if (location == null) {
- logger.error("Recovered SWAP_OUT record from edit log, but it did not contain a Location; skipping record");
- } else {
- swapLocations.add(location);
- updatedRecordMap.remove(recordId);
- idsRemoved.add(recordId);
- }
- } else {
- updatedRecordMap.put(recordId, record);
- idsRemoved.remove(recordId);
- }
-
- transactionFlag = recoveryIn.read();
- } while (transactionFlag != TRANSACTION_COMMIT);
-
- return idsRemoved;
- }
-
- /**
- * Must be called after recovery has finished
- *
- * @return
- */
- public long getMaxRecoveredTransactionId() {
- return maxTransactionId.get();
- }
-
- @Override
- public String toString() {
- return description;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/wali/src/main/java/org/wali/SerDe.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/main/java/org/wali/SerDe.java b/commons/wali/src/main/java/org/wali/SerDe.java
deleted file mode 100644
index bbc7efb..0000000
--- a/commons/wali/src/main/java/org/wali/SerDe.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wali;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * A mechanism for Serializing and De-Serializing a Record of a given Type
- *
- * @param <T> the type of record that is to be Serialized and De-Serialized by
- * this object
- */
-public interface SerDe<T> {
-
- /**
- * <p>
- * Serializes an Edit Record to the log via the given
- * {@link DataOutputStream}.
- * </p>
- *
- * @param previousRecordState
- * @param newRecordState
- * @param out
- * @throws IOException
- */
- void serializeEdit(T previousRecordState, T newRecordState, DataOutputStream out) throws IOException;
-
- /**
- * <p>
- * Serializes a Record in a form suitable for a Snapshot via the given
- * {@link DataOutputStream}.
- * </p>
- *
- * @param record
- * @param out
- * @throws IOException
- */
- void serializeRecord(T record, DataOutputStream out) throws IOException;
-
- /**
- * <p>
- * Reads an Edit Record from the given {@link DataInputStream} and merges
- * that edit with the current version of the record, returning the new,
- * merged version. If the Edit Record indicates that the entity was deleted,
- * must return a Record with an UpdateType of {@link UpdateType#DELETE}.
- * This method must never return <code>null</code>.
- * </p>
- *
- * @param in
- * @param currentRecordStates an unmodifiable map of Record ID's to the
- * current state of that record
- * @param version the version of the SerDe that was used to serialize the
- * edit record
- * @return
- * @throws IOException
- */
- T deserializeEdit(DataInputStream in, Map<Object, T> currentRecordStates, int version) throws IOException;
-
- /**
- * <p>
- * Reads a Record from the given {@link DataInputStream} and returns this
- * record. If no data is available, returns <code>null</code>.
- * </p>
- *
- * @param in
- * @param version the version of the SerDe that was used to serialize the
- * record
- * @return
- * @throws IOException
- */
- T deserializeRecord(DataInputStream in, int version) throws IOException;
-
- /**
- * Returns the unique ID for the given record
- *
- * @param record
- * @return
- */
- Object getRecordIdentifier(T record);
-
- /**
- * Returns the UpdateType for the given record
- *
- * @param record
- * @return
- */
- UpdateType getUpdateType(T record);
-
- /**
- * Returns the external location of the given record; this is used when a
- * record is moved away from WALI or is being re-introduced to WALI. For
- * example, WALI can be updated with a record of type
- * {@link UpdateType#SWAP_OUT} that indicates a Location of
- * file://tmp/external1 and can then be re-introduced to WALI by updating
- * WALI with a record of type {@link UpdateType#CREATE} that indicates a
- * Location of file://tmp/external1
- *
- * @param record
- * @return
- */
- String getLocation(T record);
-
- /**
- * Returns the version that this SerDe will use when writing. This used used
- * when serializing/deserializing the edit logs so that if the version
- * changes, we are still able to deserialize old versions
- *
- * @return
- */
- int getVersion();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/wali/src/main/java/org/wali/SyncListener.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/main/java/org/wali/SyncListener.java b/commons/wali/src/main/java/org/wali/SyncListener.java
deleted file mode 100644
index ffb11ca..0000000
--- a/commons/wali/src/main/java/org/wali/SyncListener.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wali;
-
-/**
- * <p>
- * Provides a callback mechanism by which applicable listeners can be notified
- * when a WriteAheadRepository is synched (via the
- * {@link WriteAheadRepository#sync()} method) or one of its partitions is
- * synched via
- * {@link WriteAheadRepository#update(java.util.Collection, boolean)} with a
- * value of <code>true</code> for the second argument.
- * </p>
- *
- * <p>
- * It is not required that an implementation of {@link WriteAheadRepository}
- * support this interface. Those that do generally will require that the
- * listener be injected via the constructor.
- * </p>
- *
- * <p>
- * All implementations of this interface must be thread-safe.
- * </p>
- *
- * <p>
- * The {@link #onSync(int)} method will always be called while the associated
- * partition is locked. The {@link #onGlobalSync()} will always be called while
- * the entire repository is locked.
- * </p>
- *
- */
-public interface SyncListener {
-
- /**
- * This method is called whenever a specific partition is synched via the
- * {@link WriteAheadRepository#update(java.util.Collection, boolean)} method
- *
- * @param partitionIndex the index of the partition that was synched
- */
- void onSync(int partitionIndex);
-
- /**
- * This method is called whenever the entire
- * <code>WriteAheadRepository</code> is synched via the
- * {@link WriteAheadRepository#sync()} method.
- */
- void onGlobalSync();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/wali/src/main/java/org/wali/UpdateType.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/main/java/org/wali/UpdateType.java b/commons/wali/src/main/java/org/wali/UpdateType.java
deleted file mode 100644
index 1b039f8..0000000
--- a/commons/wali/src/main/java/org/wali/UpdateType.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wali;
-
-/**
- * <p>
- * Enumerates the valid types of things that can cause a
- * {@link WriteAheadRepository} to update its state</p>
- */
-public enum UpdateType {
-
- /**
- * Used when a new Record has been created
- */
- CREATE,
- /**
- * Used when a Record has been updated in some way
- */
- UPDATE,
- /**
- * Used to indicate that a Record has been deleted and should be removed
- * from the Repository
- */
- DELETE,
- /**
- * Used to indicate that a Record still exists but has been moved elsewhere,
- * so that it is no longer maintained by the WALI instance
- */
- SWAP_OUT,
- /**
- * Used to indicate that a Record that was previously Swapped Out is now
- * being Swapped In
- */
- SWAP_IN;
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/wali/src/main/java/org/wali/WriteAheadRepository.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/main/java/org/wali/WriteAheadRepository.java b/commons/wali/src/main/java/org/wali/WriteAheadRepository.java
deleted file mode 100644
index 4567872..0000000
--- a/commons/wali/src/main/java/org/wali/WriteAheadRepository.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wali;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Set;
-
-/**
- * <p>
- * A WriteAheadRepository is used to persist state that is otherwise kept
- * in-memory. The Repository does not provide any query capability except to
- * allow the data to be recovered upon restart of the system.
- * </p>
- *
- * <p>
- * A WriteAheadRepository operates by writing every update to an Edit Log. On
- * restart, the data can be recovered by replaying all of the updates that are
- * found in the Edit Log. This can, however, eventually result in very large
- * Edit Logs, which can both take up massive amounts of disk space and take a
- * long time to recover. In order to prevent this, the Repository provides a
- * Checkpointing capability. This allows the current in-memory state of the
- * Repository to be flushed to disk and the Edit Log to be deleted, thereby
- * compacting the amount of space required to store the Repository. After a
- * Checkpoint is performed, modifications are again written to an Edit Log. At
- * this point, when the system is to be restored, it is restored by first
- * loading the Checkpointed version of the Repository and then replaying the
- * Edit Log.
- * </p>
- *
- * <p>
- * All implementations of <code>WriteAheadRepository</code> use one or more
- * partitions to manage their Edit Logs. An implementation may require exactly
- * one partition or may allow many partitions.
- * </p>
- *
- * @param <T>
- */
-public interface WriteAheadRepository<T> {
-
- /**
- * <p>
- * Updates the repository with the specified Records. The Collection must
- * not contain multiple records with the same ID
- * </p>
- *
- * @param records the records to update
- * @param forceSync specifies whether or not the Repository forces the data
- * to be flushed to disk. If false, the data may be stored in Operating
- * System buffers, which improves performance but could cause loss of data
- * if power is lost or the Operating System crashes
- * @throws IOException
- * @throws IllegalArgumentException if multiple records within the given
- * Collection have the same ID, as specified by {@link Record#getId()}
- * method
- *
- * @return the index of the Partition that performed the update
- */
- int update(Collection<T> records, boolean forceSync) throws IOException;
-
- /**
- * <p>
- * Recovers all records from the persisted state. This method must be called
- * before any updates are issued to the Repository.
- * </p>
- *
- * @return
- * @throws IOException
- * @throws IllegalStateException if any updates have been issued against
- * this Repository before this method is invoked
- */
- Collection<T> recoverRecords() throws IOException;
-
- /**
- * <p>
- * Recovers all External Swap locations that were persisted. If this method
- * is to be called, it must be called AFTER {@link #recoverRecords()} and
- * BEFORE {@link update}.
- * </p>
- *
- * @return
- * @throws IOException
- */
- Set<String> getRecoveredSwapLocations() throws IOException;
-
- /**
- * <p>
- * Compacts the contents of the Repository so that rather than having a
- * Snapshot and an Edit Log indicating many Updates to the Snapshot, the
- * Snapshot is updated to contain the current state of the Repository, and
- * the edit log is purged.
- * </p>
- *
- *
- * @return the number of records that were written to the new snapshot
- * @throws java.io.IOException
- */
- int checkpoint() throws IOException;
-
- /**
- * <p>
- * Causes the repository to checkpoint and then close any open resources.
- * </p>
- *
- * @throws IOException
- */
- void shutdown() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/wali/src/test/java/org/wali/DummyRecord.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/test/java/org/wali/DummyRecord.java b/commons/wali/src/test/java/org/wali/DummyRecord.java
deleted file mode 100644
index e0f7f96..0000000
--- a/commons/wali/src/test/java/org/wali/DummyRecord.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wali;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-public class DummyRecord {
-
- private final String id;
- private final Map<String, String> props;
- private final UpdateType updateType;
-
- public DummyRecord(final String id, final UpdateType updateType) {
- this.id = id;
- this.props = new HashMap<>();
- this.updateType = updateType;
- }
-
- public String getId() {
- return id;
- }
-
- public UpdateType getUpdateType() {
- return updateType;
- }
-
- public DummyRecord setProperties(final Map<String, String> props) {
- this.props.clear();
- this.props.putAll(props);
- return this;
- }
-
- public DummyRecord setProperty(final String name, final String value) {
- this.props.put(name, value);
- return this;
- }
-
- public Map<String, String> getProperties() {
- return Collections.unmodifiableMap(this.props);
- }
-
- public String getProperty(final String name) {
- return props.get(name);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/wali/src/test/java/org/wali/DummyRecordSerde.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/test/java/org/wali/DummyRecordSerde.java b/commons/wali/src/test/java/org/wali/DummyRecordSerde.java
deleted file mode 100644
index 8cc7860..0000000
--- a/commons/wali/src/test/java/org/wali/DummyRecordSerde.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wali;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.Map;
-
-public class DummyRecordSerde implements SerDe<DummyRecord> {
-
- public static final int NUM_UPDATE_TYPES = UpdateType.values().length;
- private int throwIOEAfterNserializeEdits = -1;
- private int serializeEditCount = 0;
-
- @Override
- public void serializeEdit(final DummyRecord previousState, final DummyRecord record, final DataOutputStream out) throws IOException {
- if (throwIOEAfterNserializeEdits >= 0 && (serializeEditCount++ >= throwIOEAfterNserializeEdits)) {
- throw new IOException("Serialized " + (serializeEditCount - 1) + " records successfully, so now it's time to throw IOE");
- }
-
- out.write(record.getUpdateType().ordinal());
- out.writeUTF(record.getId());
-
- if (record.getUpdateType() != UpdateType.DELETE) {
- final Map<String, String> props = record.getProperties();
- out.writeInt(props.size());
- for (final Map.Entry<String, String> entry : props.entrySet()) {
- out.writeUTF(entry.getKey());
- out.writeUTF(entry.getValue());
- }
- }
- }
-
- @Override
- public void serializeRecord(final DummyRecord record, final DataOutputStream out) throws IOException {
- serializeEdit(null, record, out);
- }
-
- @Override
- public DummyRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
- final int index = in.read();
- if (index < 0) {
- throw new EOFException();
- }
- if (index >= NUM_UPDATE_TYPES) {
- throw new IOException("Corrupt stream; got UpdateType value of " + index + " but there are only " + NUM_UPDATE_TYPES + " valid values");
- }
- final UpdateType updateType = UpdateType.values()[index];
- final String id = in.readUTF();
- final DummyRecord record = new DummyRecord(id, updateType);
-
- if (record.getUpdateType() != UpdateType.DELETE) {
- final int numProps = in.readInt();
- for (int i = 0; i < numProps; i++) {
- final String key = in.readUTF();
- final String value = in.readUTF();
- record.setProperty(key, value);
- }
- }
- return record;
- }
-
- @Override
- public Object getRecordIdentifier(final DummyRecord record) {
- return record.getId();
- }
-
- @Override
- public UpdateType getUpdateType(final DummyRecord record) {
- return record.getUpdateType();
- }
-
- @Override
- public DummyRecord deserializeEdit(final DataInputStream in, final Map<Object, DummyRecord> currentVersion, final int version) throws IOException {
- return deserializeRecord(in, version);
- }
-
- @Override
- public int getVersion() {
- return 1;
- }
-
- public void setThrowIOEAfterNSerializeEdits(final int n) {
- this.throwIOEAfterNserializeEdits = n;
- }
-
- @Override
- public String getLocation(final DummyRecord record) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/wali/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git a/commons/wali/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java b/commons/wali/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
deleted file mode 100644
index 57f3495..0000000
--- a/commons/wali/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wali;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestMinimalLockingWriteAheadLog {
-
- @Test
- public void testWrite() throws IOException, InterruptedException {
- final int numPartitions = 8;
-
- final Path path = Paths.get("target/minimal-locking-repo");
- deleteRecursively(path.toFile());
- assertTrue(path.toFile().mkdirs());
-
- final DummyRecordSerde serde = new DummyRecordSerde();
- final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
- final Collection<DummyRecord> initialRecs = repo.recoverRecords();
- assertTrue(initialRecs.isEmpty());
-
- final List<InsertThread> threads = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- threads.add(new InsertThread(10000, 1000000 * i, repo));
- }
-
- final long start = System.nanoTime();
- for (final InsertThread thread : threads) {
- thread.start();
- }
- for (final InsertThread thread : threads) {
- thread.join();
- }
- final long nanos = System.nanoTime() - start;
- final long millis = TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS);
- System.out.println("Took " + millis + " millis to insert 1,000,000 records each in its own transaction");
- repo.shutdown();
-
- final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
- final Collection<DummyRecord> recoveredRecords = recoverRepo.recoverRecords();
- assertFalse(recoveredRecords.isEmpty());
- assertEquals(100000, recoveredRecords.size());
- for (final DummyRecord record : recoveredRecords) {
- final Map<String, String> recoveredProps = record.getProperties();
- assertEquals(1, recoveredProps.size());
- assertEquals("B", recoveredProps.get("A"));
- }
- }
-
- @Test
- public void testRecoverAfterIOException() throws IOException {
- final int numPartitions = 5;
- final Path path = Paths.get("target/minimal-locking-repo-test-recover-after-ioe");
- deleteRecursively(path.toFile());
- Files.createDirectories(path);
-
- final DummyRecordSerde serde = new DummyRecordSerde();
- final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
- final Collection<DummyRecord> initialRecs = repo.recoverRecords();
- assertTrue(initialRecs.isEmpty());
-
- serde.setThrowIOEAfterNSerializeEdits(7); // serialize the 2 transactions, then the first edit of the third transaction; then throw IOException
-
- final List<DummyRecord> firstTransaction = new ArrayList<>();
- firstTransaction.add(new DummyRecord("1", UpdateType.CREATE));
- firstTransaction.add(new DummyRecord("2", UpdateType.CREATE));
- firstTransaction.add(new DummyRecord("3", UpdateType.CREATE));
-
- final List<DummyRecord> secondTransaction = new ArrayList<>();
- secondTransaction.add(new DummyRecord("1", UpdateType.UPDATE).setProperty("abc", "123"));
- secondTransaction.add(new DummyRecord("2", UpdateType.UPDATE).setProperty("cba", "123"));
- secondTransaction.add(new DummyRecord("3", UpdateType.UPDATE).setProperty("aaa", "123"));
-
- final List<DummyRecord> thirdTransaction = new ArrayList<>();
- thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE));
- thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE));
-
- repo.update(firstTransaction, true);
- repo.update(secondTransaction, true);
- try {
- repo.update(thirdTransaction, true);
- Assert.fail("Did not throw IOException on third transaction");
- } catch (final IOException e) {
- // expected behavior.
- }
-
- repo.shutdown();
-
- serde.setThrowIOEAfterNSerializeEdits(-1);
- final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
- final Collection<DummyRecord> recoveredRecords = recoverRepo.recoverRecords();
- assertFalse(recoveredRecords.isEmpty());
- assertEquals(3, recoveredRecords.size());
-
- boolean record1 = false, record2 = false, record3 = false;
- for (final DummyRecord record : recoveredRecords) {
- switch (record.getId()) {
- case "1":
- record1 = true;
- assertEquals("123", record.getProperty("abc"));
- break;
- case "2":
- record2 = true;
- assertEquals("123", record.getProperty("cba"));
- break;
- case "3":
- record3 = true;
- assertEquals("123", record.getProperty("aaa"));
- break;
- }
- }
-
- assertTrue(record1);
- assertTrue(record2);
- assertTrue(record3);
- }
-
- @Test
- public void testCannotModifyLogAfterAllAreBlackListed() throws IOException {
- final int numPartitions = 5;
- final Path path = Paths.get("target/minimal-locking-repo-test-cannot-modify-after-all-blacklisted");
- deleteRecursively(path.toFile());
- Files.createDirectories(path);
-
- final DummyRecordSerde serde = new DummyRecordSerde();
- final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
- final Collection<DummyRecord> initialRecs = repo.recoverRecords();
- assertTrue(initialRecs.isEmpty());
-
- serde.setThrowIOEAfterNSerializeEdits(3); // serialize the first transaction, then fail on all subsequent transactions
-
- final List<DummyRecord> firstTransaction = new ArrayList<>();
- firstTransaction.add(new DummyRecord("1", UpdateType.CREATE));
- firstTransaction.add(new DummyRecord("2", UpdateType.CREATE));
- firstTransaction.add(new DummyRecord("3", UpdateType.CREATE));
-
- final List<DummyRecord> secondTransaction = new ArrayList<>();
- secondTransaction.add(new DummyRecord("1", UpdateType.UPDATE).setProperty("abc", "123"));
- secondTransaction.add(new DummyRecord("2", UpdateType.UPDATE).setProperty("cba", "123"));
- secondTransaction.add(new DummyRecord("3", UpdateType.UPDATE).setProperty("aaa", "123"));
-
- final List<DummyRecord> thirdTransaction = new ArrayList<>();
- thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE));
- thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE));
-
- repo.update(firstTransaction, true);
-
- try {
- repo.update(secondTransaction, true);
- Assert.fail("Did not throw IOException on second transaction");
- } catch (final IOException e) {
- // expected behavior.
- }
-
- for (int i = 0; i < 4; i++) {
- try {
- repo.update(thirdTransaction, true);
- Assert.fail("Did not throw IOException on third transaction");
- } catch (final IOException e) {
- // expected behavior.
- }
- }
-
- serde.setThrowIOEAfterNSerializeEdits(-1);
- final List<DummyRecord> fourthTransaction = new ArrayList<>();
- fourthTransaction.add(new DummyRecord("1", UpdateType.DELETE));
-
- try {
- repo.update(fourthTransaction, true);
- Assert.fail("Successfully updated repo for 4th transaction");
- } catch (final IOException e) {
- // expected behavior
- assertTrue(e.getMessage().contains("All Partitions have been blacklisted"));
- }
-
- repo.shutdown();
- serde.setThrowIOEAfterNSerializeEdits(-1);
-
- final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
- final Collection<DummyRecord> recoveredRecords = recoverRepo.recoverRecords();
- assertFalse(recoveredRecords.isEmpty());
- assertEquals(3, recoveredRecords.size());
- }
-
- @Test
- public void testStriping() throws IOException {
- final int numPartitions = 6;
- final Path path = Paths.get("target/minimal-locking-repo-striped");
- deleteRecursively(path.toFile());
- Files.createDirectories(path);
-
- final SortedSet<Path> paths = new TreeSet<>();
- paths.add(path.resolve("stripe-1"));
- paths.add(path.resolve("stripe-2"));
-
- final DummyRecordSerde serde = new DummyRecordSerde();
- final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(paths, numPartitions, serde, null);
- final Collection<DummyRecord> initialRecs = repo.recoverRecords();
- assertTrue(initialRecs.isEmpty());
-
- final InsertThread inserter = new InsertThread(100000, 0, repo);
- inserter.run();
-
- for (final Path partitionPath : paths) {
- final File[] files = partitionPath.toFile().listFiles(new FileFilter() {
- @Override
- public boolean accept(File pathname) {
- return pathname.getName().startsWith("partition");
- }
- });
- assertEquals(3, files.length);
-
- for (final File file : files) {
- final File[] journalFiles = file.listFiles();
- assertEquals(1, journalFiles.length);
- }
- }
-
- repo.checkpoint();
-
- }
-
- private static class InsertThread extends Thread {
-
- private final List<List<DummyRecord>> records;
- private final WriteAheadRepository<DummyRecord> repo;
-
- public InsertThread(final int numInsertions, final int startIndex, final WriteAheadRepository<DummyRecord> repo) {
- records = new ArrayList<>();
- for (int i = 0; i < numInsertions; i++) {
- final DummyRecord record = new DummyRecord(String.valueOf(i + startIndex), UpdateType.CREATE);
- record.setProperty("A", "B");
- final List<DummyRecord> list = new ArrayList<>();
- list.add(record);
- records.add(list);
- }
- this.repo = repo;
- }
-
- @Override
- public void run() {
- try {
- int counter = 0;
- for (final List<DummyRecord> list : records) {
- final boolean forceSync = (++counter == records.size());
- repo.update(list, forceSync);
- }
- } catch (IOException e) {
- Assert.fail("Failed to update: " + e.toString());
- e.printStackTrace();
- }
- }
- }
-
- private void deleteRecursively(final File file) {
- final File[] children = file.listFiles();
- if (children != null) {
- for (final File child : children) {
- deleteRecursively(child);
- }
- }
-
- file.delete();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/maven-plugins/nar-maven-plugin/pom.xml
----------------------------------------------------------------------
diff --git a/maven-plugins/nar-maven-plugin/pom.xml b/maven-plugins/nar-maven-plugin/pom.xml
new file mode 100644
index 0000000..c0236fd
--- /dev/null
+++ b/maven-plugins/nar-maven-plugin/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>maven-plugins</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+ <artifactId>nar-maven-plugin</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <packaging>maven-plugin</packaging>
+ <name>Apache NiFi NAR Plugin</name>
+ <description>Apache NiFi Nar Plugin. It is currently a part of the Apache Incubator.</description>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-plugin-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>default-descriptor</id>
+ <goals>
+ <goal>descriptor</goal>
+ </goals>
+ <phase>process-classes</phase>
+ </execution>
+ <execution>
+ <id>help-descriptor</id>
+ <goals>
+ <goal>helpmojo</goal>
+ </goals>
+ <phase>process-classes</phase>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-plugin-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <type>maven-plugin</type>
+ </dependency>
+ <dependency>
+ <!-- No code from maven-jar-plugin is actually used; it's included
+ just to simplify the dependencies list. -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.maven.plugin-tools</groupId>
+ <artifactId>maven-plugin-annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>