You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/06/30 02:21:35 UTC
[09/11] flume git commit: FLUME-2937. Integrate checkstyle for
non-test classes
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
index 488dcf4..336aa2c 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
@@ -50,20 +50,17 @@ import java.util.concurrent.atomic.AtomicLong;
@InterfaceStability.Unstable
public abstract class LogFile {
- private static final Logger LOG = LoggerFactory
- .getLogger(LogFile.class);
-
+ private static final Logger LOG = LoggerFactory.getLogger(LogFile.class);
/**
* This class preallocates the data files 1MB at time to avoid
* the updating of the inode on each write and to avoid the disk
* filling up during a write. It's also faster, so there.
*/
- private static final ByteBuffer FILL = DirectMemoryUtils.
- allocate(1024 * 1024); // preallocation, 1MB
+ private static final ByteBuffer FILL = DirectMemoryUtils.allocate(1024 * 1024);
public static final byte OP_RECORD = Byte.MAX_VALUE;
- public static final byte OP_NOOP = (Byte.MAX_VALUE + Byte.MIN_VALUE)/2;
+ public static final byte OP_NOOP = (Byte.MAX_VALUE + Byte.MIN_VALUE) / 2;
public static final byte OP_EOF = Byte.MIN_VALUE;
static {
@@ -73,7 +70,7 @@ public abstract class LogFile {
}
protected static void skipRecord(RandomAccessFile fileHandle,
- int offset) throws IOException {
+ int offset) throws IOException {
fileHandle.seek(offset);
int length = fileHandle.readInt();
fileHandle.skipBytes(length);
@@ -93,31 +90,40 @@ public abstract class LogFile {
writeFileHandle = new RandomAccessFile(file, "rw");
}
+
protected RandomAccessFile getFileHandle() {
return writeFileHandle;
}
+
protected void setLastCheckpointOffset(long lastCheckpointOffset) {
this.lastCheckpointOffset = lastCheckpointOffset;
}
+
protected void setLastCheckpointWriteOrderID(long lastCheckpointWriteOrderID) {
this.lastCheckpointWriteOrderID = lastCheckpointWriteOrderID;
}
+
protected long getLastCheckpointOffset() {
return lastCheckpointOffset;
}
+
protected long getLastCheckpointWriteOrderID() {
return lastCheckpointWriteOrderID;
}
+
protected File getFile() {
return file;
}
+
protected int getLogFileID() {
return logFileID;
}
+
void markCheckpoint(long logWriteOrderID)
throws IOException {
markCheckpoint(lastCheckpointOffset, logWriteOrderID);
}
+
abstract void markCheckpoint(long currentPosition, long logWriteOrderID)
throws IOException;
@@ -150,9 +156,10 @@ public abstract class LogFile {
Preconditions.checkArgument(numBytes >= 0, "numBytes less than zero");
value.addAndGet(-numBytes);
}
+
long getUsableSpace() {
long now = System.currentTimeMillis();
- if(now - interval > lastRefresh.get()) {
+ if (now - interval > lastRefresh.get()) {
value.set(fs.getUsableSpace());
lastRefresh.set(now);
}
@@ -160,7 +167,7 @@ public abstract class LogFile {
}
}
- static abstract class Writer {
+ abstract static class Writer {
private final int logFileID;
private final File file;
private final long maxFileSize;
@@ -180,10 +187,9 @@ public abstract class LogFile {
// To ensure we can count the number of fsyncs.
private long syncCount;
-
Writer(File file, int logFileID, long maxFileSize,
- CipherProvider.Encryptor encryptor, long usableSpaceRefreshInterval,
- boolean fsyncPerTransaction, int fsyncInterval) throws IOException {
+ CipherProvider.Encryptor encryptor, long usableSpaceRefreshInterval,
+ boolean fsyncPerTransaction, int fsyncInterval) throws IOException {
this.file = file;
this.logFileID = logFileID;
this.maxFileSize = Math.min(maxFileSize,
@@ -193,7 +199,7 @@ public abstract class LogFile {
writeFileChannel = writeFileHandle.getChannel();
this.fsyncPerTransaction = fsyncPerTransaction;
this.fsyncInterval = fsyncInterval;
- if(!fsyncPerTransaction) {
+ if (!fsyncPerTransaction) {
LOG.info("Sync interval = " + fsyncInterval);
syncExecutor = Executors.newSingleThreadScheduledExecutor();
syncExecutor.scheduleWithFixedDelay(new Runnable() {
@@ -203,7 +209,7 @@ public abstract class LogFile {
sync();
} catch (Throwable ex) {
LOG.error("Data file, " + getFile().toString() + " could not " +
- "be synced to disk due to an error.", ex);
+ "be synced to disk due to an error.", ex);
}
}
}, fsyncInterval, fsyncInterval, TimeUnit.SECONDS);
@@ -220,6 +226,7 @@ public abstract class LogFile {
protected CipherProvider.Encryptor getEncryptor() {
return encryptor;
}
+
int getLogFileID() {
return logFileID;
}
@@ -227,6 +234,7 @@ public abstract class LogFile {
File getFile() {
return file;
}
+
String getParent() {
return file.getParent();
}
@@ -240,7 +248,7 @@ public abstract class LogFile {
}
@VisibleForTesting
- long getLastCommitPosition(){
+ long getLastCommitPosition() {
return lastCommitPosition;
}
@@ -253,6 +261,7 @@ public abstract class LogFile {
long getSyncCount() {
return syncCount;
}
+
synchronized long position() throws IOException {
return getFileChannel().position();
}
@@ -261,20 +270,22 @@ public abstract class LogFile {
// methods, so all methods need to be synchronized.
synchronized FlumeEventPointer put(ByteBuffer buffer) throws IOException {
- if(encryptor != null) {
+ if (encryptor != null) {
buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array()));
}
Pair<Integer, Integer> pair = write(buffer);
return new FlumeEventPointer(pair.getLeft(), pair.getRight());
}
+
synchronized void take(ByteBuffer buffer) throws IOException {
- if(encryptor != null) {
+ if (encryptor != null) {
buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array()));
}
write(buffer);
}
+
synchronized void rollback(ByteBuffer buffer) throws IOException {
- if(encryptor != null) {
+ if (encryptor != null) {
buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array()));
}
write(buffer);
@@ -290,20 +301,20 @@ public abstract class LogFile {
}
private Pair<Integer, Integer> write(ByteBuffer buffer)
- throws IOException {
- if(!isOpen()) {
+ throws IOException {
+ if (!isOpen()) {
throw new LogFileRetryableIOException("File closed " + file);
}
long length = position();
long expectedLength = length + (long) buffer.limit();
- if(expectedLength > maxFileSize) {
+ if (expectedLength > maxFileSize) {
throw new LogFileRetryableIOException(expectedLength + " > " +
maxFileSize);
}
- int offset = (int)length;
+ int offset = (int) length;
Preconditions.checkState(offset >= 0, String.valueOf(offset));
// OP_RECORD + size + buffer
- int recordLength = 1 + (int)Serialization.SIZE_OF_INT + buffer.limit();
+ int recordLength = 1 + (int) Serialization.SIZE_OF_INT + buffer.limit();
usableSpace.decrement(recordLength);
preallocate(recordLength);
ByteBuffer toWrite = ByteBuffer.allocate(recordLength);
@@ -323,15 +334,16 @@ public abstract class LogFile {
* Sync the underlying log file to disk. Expensive call,
* should be used only on commits. If a sync has already happened after
* the last commit, this method is a no-op
+ *
* @throws IOException
* @throws LogFileRetryableIOException - if this log file is closed.
*/
synchronized void sync() throws IOException {
if (!fsyncPerTransaction && !dirty) {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug(
- "No events written to file, " + getFile().toString() +
- " in last " + fsyncInterval + " or since last commit.");
+ "No events written to file, " + getFile().toString() +
+ " in last " + fsyncInterval + " or since last commit.");
}
return;
}
@@ -346,27 +358,29 @@ public abstract class LogFile {
}
}
-
protected boolean isOpen() {
return open;
}
+
protected RandomAccessFile getFileHandle() {
return writeFileHandle;
}
+
protected FileChannel getFileChannel() {
return writeFileChannel;
}
+
synchronized void close() {
- if(open) {
+ if (open) {
open = false;
if (!fsyncPerTransaction) {
// Shutdown the executor before attempting to close.
- if(syncExecutor != null) {
+ if (syncExecutor != null) {
// No need to wait for it to shutdown.
syncExecutor.shutdown();
}
}
- if(writeFileChannel.isOpen()) {
+ if (writeFileChannel.isOpen()) {
LOG.info("Closing " + file);
try {
writeFileChannel.force(true);
@@ -381,9 +395,10 @@ public abstract class LogFile {
}
}
}
+
protected void preallocate(int size) throws IOException {
long position = position();
- if(position + size > getFileChannel().size()) {
+ if (position + size > getFileChannel().size()) {
LOG.debug("Preallocating at position " + position);
synchronized (FILL) {
FILL.position(0);
@@ -404,7 +419,7 @@ public abstract class LogFile {
public OperationRecordUpdater(File file) throws FileNotFoundException {
Preconditions.checkState(file.exists(), "File to update, " +
- file.toString() + " does not exist.");
+ file.toString() + " does not exist.");
this.file = file;
fileHandle = new RandomAccessFile(file, "rw");
}
@@ -417,10 +432,10 @@ public abstract class LogFile {
fileHandle.seek(offset);
byte byteRead = fileHandle.readByte();
Preconditions.checkState(byteRead == OP_RECORD || byteRead == OP_NOOP,
- "Expected to read a record but the byte read indicates EOF");
+ "Expected to read a record but the byte read indicates EOF");
fileHandle.seek(offset);
LOG.info("Marking event as " + OP_NOOP + " at " + offset + " for file " +
- file.toString());
+ file.toString());
fileHandle.writeByte(OP_NOOP);
}
@@ -430,20 +445,21 @@ public abstract class LogFile {
fileHandle.close();
} catch (IOException e) {
LOG.error("Could not close file handle to file " +
- fileHandle.toString(), e);
+ fileHandle.toString(), e);
}
}
}
- static abstract class RandomReader {
+ abstract static class RandomReader {
private final File file;
private final BlockingQueue<RandomAccessFile> readFileHandles =
new ArrayBlockingQueue<RandomAccessFile>(50, true);
private final KeyProvider encryptionKeyProvider;
private final boolean fsyncPerTransaction;
private volatile boolean open;
+
public RandomReader(File file, @Nullable KeyProvider
- encryptionKeyProvider, boolean fsyncPerTransaction)
+ encryptionKeyProvider, boolean fsyncPerTransaction)
throws IOException {
this.file = file;
this.encryptionKeyProvider = encryptionKeyProvider;
@@ -466,31 +482,31 @@ public abstract class LogFile {
}
FlumeEvent get(int offset) throws IOException, InterruptedException,
- CorruptEventException, NoopRecordException {
+ CorruptEventException, NoopRecordException {
Preconditions.checkState(open, "File closed");
RandomAccessFile fileHandle = checkOut();
boolean error = true;
try {
fileHandle.seek(offset);
byte operation = fileHandle.readByte();
- if(operation == OP_NOOP) {
+ if (operation == OP_NOOP) {
throw new NoopRecordException("No op record found. Corrupt record " +
- "may have been repaired by File Channel Integrity tool");
+ "may have been repaired by File Channel Integrity tool");
}
if (operation != OP_RECORD) {
throw new CorruptEventException(
- "Operation code is invalid. File " +
- "is corrupt. Please run File Channel Integrity tool.");
+ "Operation code is invalid. File " +
+ "is corrupt. Please run File Channel Integrity tool.");
}
TransactionEventRecord record = doGet(fileHandle);
- if(!(record instanceof Put)) {
+ if (!(record instanceof Put)) {
Preconditions.checkState(false, "Record is " +
record.getClass().getSimpleName());
}
error = false;
- return ((Put)record).getEvent();
+ return ((Put) record).getEvent();
} finally {
- if(error) {
+ if (error) {
close(fileHandle, file);
} else {
checkIn(fileHandle);
@@ -499,12 +515,12 @@ public abstract class LogFile {
}
synchronized void close() {
- if(open) {
+ if (open) {
open = false;
LOG.info("Closing RandomReader " + file);
List<RandomAccessFile> fileHandles = Lists.newArrayList();
- while(readFileHandles.drainTo(fileHandles) > 0) {
- for(RandomAccessFile fileHandle : fileHandles) {
+ while (readFileHandles.drainTo(fileHandles) > 0) {
+ for (RandomAccessFile fileHandle : fileHandles) {
synchronized (fileHandle) {
try {
fileHandle.close();
@@ -528,7 +544,7 @@ public abstract class LogFile {
}
private void checkIn(RandomAccessFile fileHandle) {
- if(!readFileHandles.offer(fileHandle)) {
+ if (!readFileHandles.offer(fileHandle)) {
close(fileHandle, file);
}
}
@@ -536,19 +552,20 @@ public abstract class LogFile {
private RandomAccessFile checkOut()
throws IOException, InterruptedException {
RandomAccessFile fileHandle = readFileHandles.poll();
- if(fileHandle != null) {
+ if (fileHandle != null) {
return fileHandle;
}
int remaining = readFileHandles.remainingCapacity();
- if(remaining > 0) {
+ if (remaining > 0) {
LOG.info("Opening " + file + " for read, remaining number of file " +
- "handles available for reads of this file is " + remaining);
+ "handles available for reads of this file is " + remaining);
return open();
}
return readFileHandles.take();
}
+
private static void close(RandomAccessFile fileHandle, File file) {
- if(fileHandle != null) {
+ if (fileHandle != null) {
try {
fileHandle.close();
} catch (IOException e) {
@@ -558,7 +575,7 @@ public abstract class LogFile {
}
}
- public static abstract class SequentialReader {
+ public abstract static class SequentialReader {
private final RandomAccessFile fileHandle;
private final FileChannel fileChannel;
@@ -573,8 +590,9 @@ public abstract class LogFile {
/**
* Construct a Sequential Log Reader object
+ *
* @param file
- * @throws IOException if an I/O error occurs
+ * @throws IOException if an I/O error occurs
* @throws EOFException if the file is empty
*/
SequentialReader(File file, @Nullable KeyProvider encryptionKeyProvider)
@@ -584,6 +602,7 @@ public abstract class LogFile {
fileHandle = new RandomAccessFile(file, "r");
fileChannel = fileHandle.getChannel();
}
+
abstract LogRecord doNext(int offset) throws IOException, CorruptEventException;
abstract int getVersion();
@@ -591,50 +610,57 @@ public abstract class LogFile {
protected void setLastCheckpointPosition(long lastCheckpointPosition) {
this.lastCheckpointPosition = lastCheckpointPosition;
}
+
protected void setLastCheckpointWriteOrderID(long lastCheckpointWriteOrderID) {
this.lastCheckpointWriteOrderID = lastCheckpointWriteOrderID;
}
+
protected void setPreviousCheckpointPosition(
- long backupCheckpointPosition) {
+ long backupCheckpointPosition) {
this.backupCheckpointPosition = backupCheckpointPosition;
}
+
protected void setPreviousCheckpointWriteOrderID(
- long backupCheckpointWriteOrderID) {
+ long backupCheckpointWriteOrderID) {
this.backupCheckpointWriteOrderID = backupCheckpointWriteOrderID;
}
+
protected void setLogFileID(int logFileID) {
this.logFileID = logFileID;
Preconditions.checkArgument(logFileID >= 0, "LogFileID is not positive: "
+ Integer.toHexString(logFileID));
}
+
protected KeyProvider getKeyProvider() {
return encryptionKeyProvider;
}
+
protected RandomAccessFile getFileHandle() {
return fileHandle;
}
+
int getLogFileID() {
return logFileID;
}
void skipToLastCheckpointPosition(long checkpointWriteOrderID)
- throws IOException {
+ throws IOException {
if (lastCheckpointPosition > 0L) {
long position = 0;
if (lastCheckpointWriteOrderID <= checkpointWriteOrderID) {
position = lastCheckpointPosition;
} else if (backupCheckpointWriteOrderID <= checkpointWriteOrderID
- && backupCheckpointPosition > 0) {
+ && backupCheckpointPosition > 0) {
position = backupCheckpointPosition;
}
fileChannel.position(position);
LOG.info("fast-forward to checkpoint position: " + position);
} else {
LOG.info("Checkpoint for file(" + file.getAbsolutePath() + ") "
- + "is: " + lastCheckpointWriteOrderID + ", which is beyond the "
- + "requested checkpoint time: " + checkpointWriteOrderID
- + " and position " + lastCheckpointPosition);
+ + "is: " + lastCheckpointWriteOrderID + ", which is beyond the "
+ + "requested checkpoint time: " + checkpointWriteOrderID
+ + " and position " + lastCheckpointPosition);
}
}
@@ -644,8 +670,8 @@ public abstract class LogFile {
long position = fileChannel.position();
if (position > FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE) {
LOG.info("File position exceeds the threshold: "
- + FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE
- + ", position: " + position);
+ + FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE
+ + ", position: " + position);
}
offset = (int) position;
Preconditions.checkState(offset >= 0);
@@ -658,21 +684,21 @@ public abstract class LogFile {
return null;
} else if (operation == OP_NOOP) {
LOG.info("No op event found in file: " + file.toString() +
- " at " + offset + ". Skipping event.");
+ " at " + offset + ". Skipping event.");
skipRecord(fileHandle, offset + 1);
offset = (int) fileHandle.getFilePointer();
continue;
} else {
LOG.error("Encountered non op-record at " + offset + " " +
- Integer.toHexString(operation) + " in " + file);
+ Integer.toHexString(operation) + " in " + file);
return null;
}
}
- if(offset >= fileHandle.length()) {
+ if (offset >= fileHandle.length()) {
return null;
}
return doNext(offset);
- } catch(EOFException e) {
+ } catch (EOFException e) {
return null;
} catch (IOException e) {
throw new IOException("Unable to read next Transaction from log file " +
@@ -683,33 +709,36 @@ public abstract class LogFile {
public long getPosition() throws IOException {
return fileChannel.position();
}
+
public void close() {
- if(fileHandle != null) {
+ if (fileHandle != null) {
try {
fileHandle.close();
- } catch (IOException e) {}
+ } catch (IOException e) {
+ }
}
}
}
+
protected static void writeDelimitedBuffer(ByteBuffer output, ByteBuffer buffer)
throws IOException {
output.putInt(buffer.limit());
output.put(buffer);
}
+
protected static byte[] readDelimitedBuffer(RandomAccessFile fileHandle)
throws IOException, CorruptEventException {
int length = fileHandle.readInt();
if (length < 0) {
- throw new CorruptEventException("Length of event is: " + String.valueOf
- (length) + ". Event must have length >= 0. Possible corruption of " +
- "data or partial fsync.");
+ throw new CorruptEventException("Length of event is: " + String.valueOf(length) +
+ ". Event must have length >= 0. Possible corruption of data or partial fsync.");
}
byte[] buffer = new byte[length];
try {
fileHandle.readFully(buffer);
} catch (EOFException ex) {
throw new CorruptEventException("Remaining data in file less than " +
- "expected size of event.", ex);
+ "expected size of event.", ex);
}
return buffer;
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
index 7d7fd85..f2fcad6 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
@@ -18,24 +18,23 @@
*/
package org.apache.flume.channel.file;
+import com.google.common.base.Preconditions;
+import org.apache.flume.channel.file.encryption.KeyProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.security.Key;
-import javax.annotation.Nullable;
-
-import org.apache.flume.channel.file.encryption.KeyProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
@SuppressWarnings("deprecation")
class LogFileFactory {
private static final Logger LOGGER =
LoggerFactory.getLogger(LogFileFactory.class);
+
private LogFileFactory() {}
static LogFile.MetaDataWriter getMetaDataWriter(File file, int logFileID)
@@ -43,21 +42,21 @@ class LogFileFactory {
RandomAccessFile logFile = null;
try {
File metaDataFile = Serialization.getMetaDataFile(file);
- if(metaDataFile.exists()) {
+ if (metaDataFile.exists()) {
return new LogFileV3.MetaDataWriter(file, logFileID);
}
logFile = new RandomAccessFile(file, "r");
int version = logFile.readInt();
- if(Serialization.VERSION_2 == version) {
+ if (Serialization.VERSION_2 == version) {
return new LogFileV2.MetaDataWriter(file, logFileID);
}
throw new IOException("File " + file + " has bad version " +
Integer.toHexString(version));
} finally {
- if(logFile != null) {
+ if (logFile != null) {
try {
logFile.close();
- } catch(IOException e) {
+ } catch (IOException e) {
LOGGER.warn("Unable to close " + file, e);
}
}
@@ -65,13 +64,13 @@ class LogFileFactory {
}
static LogFile.Writer getWriter(File file, int logFileID,
- long maxFileSize, @Nullable Key encryptionKey,
- @Nullable String encryptionKeyAlias,
- @Nullable String encryptionCipherProvider,
- long usableSpaceRefreshInterval, boolean fsyncPerTransaction,
- int fsyncInterval) throws IOException {
- Preconditions.checkState(!file.exists(), "File already exists " +
- file.getAbsolutePath());
+ long maxFileSize, @Nullable Key encryptionKey,
+ @Nullable String encryptionKeyAlias,
+ @Nullable String encryptionCipherProvider,
+ long usableSpaceRefreshInterval, boolean fsyncPerTransaction,
+ int fsyncInterval) throws IOException {
+ Preconditions.checkState(!file.exists(), "File already exists " +
+ file.getAbsolutePath());
Preconditions.checkState(file.createNewFile(), "File could not be created "
+ file.getAbsolutePath());
return new LogFileV3.Writer(file, logFileID, maxFileSize, encryptionKey,
@@ -80,28 +79,29 @@ class LogFileFactory {
}
static LogFile.RandomReader getRandomReader(File file,
- @Nullable KeyProvider encryptionKeyProvider, boolean fsyncPerTransaction)
+ @Nullable KeyProvider encryptionKeyProvider,
+ boolean fsyncPerTransaction)
throws IOException {
RandomAccessFile logFile = new RandomAccessFile(file, "r");
try {
File metaDataFile = Serialization.getMetaDataFile(file);
// either this is a rr for a just created file or
// the metadata file exists and as such it's V3
- if(logFile.length() == 0L || metaDataFile.exists()) {
+ if (logFile.length() == 0L || metaDataFile.exists()) {
return new LogFileV3.RandomReader(file, encryptionKeyProvider,
- fsyncPerTransaction);
+ fsyncPerTransaction);
}
int version = logFile.readInt();
- if(Serialization.VERSION_2 == version) {
+ if (Serialization.VERSION_2 == version) {
return new LogFileV2.RandomReader(file);
}
throw new IOException("File " + file + " has bad version " +
Integer.toHexString(version));
} finally {
- if(logFile != null) {
+ if (logFile != null) {
try {
logFile.close();
- } catch(IOException e) {
+ } catch (IOException e) {
LOGGER.warn("Unable to close " + file, e);
}
}
@@ -109,7 +109,8 @@ class LogFileFactory {
}
static LogFile.SequentialReader getSequentialReader(File file,
- @Nullable KeyProvider encryptionKeyProvider, boolean fsyncPerTransaction)
+ @Nullable KeyProvider encryptionKeyProvider,
+ boolean fsyncPerTransaction)
throws IOException {
RandomAccessFile logFile = null;
try {
@@ -134,27 +135,27 @@ class LogFileFactory {
hasMeta = true;
} else {
throw new IOException("Renaming of " + tempMetadataFile.getName()
- + " to " + metaDataFile.getName() + " failed");
+ + " to " + metaDataFile.getName() + " failed");
}
} else if (oldMetadataFile.exists()) {
if (oldMetadataFile.renameTo(metaDataFile)) {
hasMeta = true;
} else {
throw new IOException("Renaming of " + oldMetadataFile.getName()
- + " to " + metaDataFile.getName() + " failed");
+ + " to " + metaDataFile.getName() + " failed");
}
}
if (hasMeta) {
// Now the metadata file has been found, delete old or temp files
// so it does not interfere with normal operation.
- if(oldMetadataFile.exists()) {
+ if (oldMetadataFile.exists()) {
oldMetadataFile.delete();
}
- if(tempMetadataFile.exists()) {
+ if (tempMetadataFile.exists()) {
tempMetadataFile.delete();
}
- if(metaDataFile.length() == 0L) {
- if(file.length() != 0L) {
+ if (metaDataFile.length() == 0L) {
+ if (file.length() != 0L) {
String msg = String.format("MetaData file %s is empty, but log %s" +
" is of size %d", metaDataFile, file, file.length());
throw new IllegalStateException(msg);
@@ -163,20 +164,20 @@ class LogFileFactory {
metaDataFile));
}
return new LogFileV3.SequentialReader(file, encryptionKeyProvider,
- fsyncPerTransaction);
+ fsyncPerTransaction);
}
logFile = new RandomAccessFile(file, "r");
int version = logFile.readInt();
- if(Serialization.VERSION_2 == version) {
+ if (Serialization.VERSION_2 == version) {
return new LogFileV2.SequentialReader(file);
}
throw new IOException("File " + file + " has bad version " +
Integer.toHexString(version));
} finally {
- if(logFile != null) {
+ if (logFile != null) {
try {
logFile.close();
- } catch(IOException e) {
+ } catch (IOException e) {
LOGGER.warn("Unable to close " + file, e);
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java
index 9447652..b0377ab 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java
@@ -22,12 +22,15 @@ import java.io.IOException;
public class LogFileRetryableIOException extends IOException {
private static final long serialVersionUID = -2747112999806160431L;
+
public LogFileRetryableIOException() {
super();
}
+
public LogFileRetryableIOException(String msg) {
super(msg);
}
+
public LogFileRetryableIOException(String msg, Throwable t) {
super(msg, t);
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
index bb25e95..62b8cb9 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
@@ -65,7 +65,7 @@ class LogFileV2 extends LogFile {
+ ", logWriteOrderID: " + getLastCheckpointWriteOrderID());
error = false;
} finally {
- if(error) {
+ if (error) {
close();
}
}
@@ -108,6 +108,7 @@ class LogFileV2 extends LogFile {
getFileChannel().force(true);
}
+
@Override
int getVersion() {
return Serialization.VERSION_2;
@@ -115,29 +116,27 @@ class LogFileV2 extends LogFile {
}
static class RandomReader extends LogFile.RandomReader {
- RandomReader(File file)
- throws IOException {
+ RandomReader(File file) throws IOException {
super(file, null, true);
}
+
@Override
int getVersion() {
return Serialization.VERSION_2;
}
+
@Override
- protected TransactionEventRecord doGet(RandomAccessFile fileHandle)
- throws IOException {
+ protected TransactionEventRecord doGet(RandomAccessFile fileHandle) throws IOException {
return TransactionEventRecord.fromDataInputV2(fileHandle);
}
}
static class SequentialReader extends LogFile.SequentialReader {
-
- SequentialReader(File file)
- throws EOFException, IOException {
+ SequentialReader(File file) throws EOFException, IOException {
super(file, null);
RandomAccessFile fileHandle = getFileHandle();
int version = fileHandle.readInt();
- if(version != getVersion()) {
+ if (version != getVersion()) {
throw new IOException("Version is " + Integer.toHexString(version) +
" expected " + Integer.toHexString(getVersion())
+ " file: " + file.getCanonicalPath());
@@ -146,10 +145,12 @@ class LogFileV2 extends LogFile {
setLastCheckpointPosition(fileHandle.readLong());
setLastCheckpointWriteOrderID(fileHandle.readLong());
}
+
@Override
public int getVersion() {
return Serialization.VERSION_2;
}
+
@Override
LogRecord doNext(int offset) throws IOException {
TransactionEventRecord event =
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
index 9b0ef93..b459947 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
@@ -21,7 +21,6 @@ package org.apache.flume.channel.file;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.GeneratedMessage;
-import org.apache.flume.Transaction;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.channel.file.encryption.CipherProvider;
@@ -58,13 +57,14 @@ public class LogFileV3 extends LogFile {
static class MetaDataWriter extends LogFile.MetaDataWriter {
private ProtosFactory.LogFileMetaData logFileMetaData;
private final File metaDataFile;
+
protected MetaDataWriter(File logFile, int logFileID) throws IOException {
super(logFile, logFileID);
metaDataFile = Serialization.getMetaDataFile(logFile);
MetaDataReader metaDataReader = new MetaDataReader(logFile, logFileID);
logFileMetaData = metaDataReader.read();
int version = logFileMetaData.getVersion();
- if(version != getVersion()) {
+ if (version != getVersion()) {
throw new IOException("Version is " + Integer.toHexString(version) +
" expected " + Integer.toHexString(getVersion())
+ " file: " + logFile);
@@ -90,9 +90,9 @@ public class LogFileV3 extends LogFile {
* would be possible to recover from a backup.
*/
metaDataBuilder.setBackupCheckpointPosition(logFileMetaData
- .getCheckpointPosition());
+ .getCheckpointPosition());
metaDataBuilder.setBackupCheckpointWriteOrderID(logFileMetaData
- .getCheckpointWriteOrderID());
+ .getCheckpointWriteOrderID());
logFileMetaData = metaDataBuilder.build();
writeDelimitedTo(logFileMetaData, metaDataFile);
}
@@ -102,17 +102,19 @@ public class LogFileV3 extends LogFile {
private final File logFile;
private final File metaDataFile;
private final int logFileID;
+
protected MetaDataReader(File logFile, int logFileID) throws IOException {
this.logFile = logFile;
metaDataFile = Serialization.getMetaDataFile(logFile);
this.logFileID = logFileID;
}
+
ProtosFactory.LogFileMetaData read() throws IOException {
FileInputStream inputStream = new FileInputStream(metaDataFile);
try {
ProtosFactory.LogFileMetaData metaData = Preconditions.checkNotNull(
- ProtosFactory.LogFileMetaData.
- parseDelimitedFrom(inputStream), "Metadata cannot be null");
+ ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream),
+ "Metadata cannot be null");
if (metaData.getLogFileID() != logFileID) {
throw new IOException("The file id of log file: "
+ logFile + " is different from expected "
@@ -123,7 +125,7 @@ public class LogFileV3 extends LogFile {
} finally {
try {
inputStream.close();
- } catch(IOException e) {
+ } catch (IOException e) {
LOGGER.warn("Unable to close " + metaDataFile, e);
}
}
@@ -133,13 +135,14 @@ public class LogFileV3 extends LogFile {
/**
* Writes a GeneratedMessage to a temp file, synchronizes it to disk
* and then renames the file over file.
- * @param msg GeneratedMessage to write to the file
+ *
+ * @param msg GeneratedMessage to write to the file
* @param file destination file
* @throws IOException if a write error occurs or the File.renameTo
- * method returns false meaning the file could not be overwritten.
+ * method returns false meaning the file could not be overwritten.
*/
public static void writeDelimitedTo(GeneratedMessage msg, File file)
- throws IOException {
+ throws IOException {
File tmp = Serialization.getMetaDataTempFile(file);
FileOutputStream outputStream = new FileOutputStream(tmp);
boolean closed = false;
@@ -148,26 +151,26 @@ public class LogFileV3 extends LogFile {
outputStream.getChannel().force(true);
outputStream.close();
closed = true;
- if(!tmp.renameTo(file)) {
+ if (!tmp.renameTo(file)) {
//Some platforms don't support moving over an existing file.
//So:
//log.meta -> log.meta.old
//log.meta.tmp -> log.meta
//delete log.meta.old
File oldFile = Serialization.getOldMetaDataFile(file);
- if(!file.renameTo(oldFile)){
+ if (!file.renameTo(oldFile)) {
throw new IOException("Unable to rename " + file + " to " + oldFile);
}
- if(!tmp.renameTo(file)) {
+ if (!tmp.renameTo(file)) {
throw new IOException("Unable to rename " + tmp + " over " + file);
}
oldFile.delete();
}
} finally {
- if(!closed) {
+ if (!closed) {
try {
outputStream.close();
- } catch(IOException e) {
+ } catch (IOException e) {
LOGGER.warn("Unable to close " + tmp, e);
}
}
@@ -177,17 +180,17 @@ public class LogFileV3 extends LogFile {
static class Writer extends LogFile.Writer {
Writer(File file, int logFileID, long maxFileSize,
- @Nullable Key encryptionKey,
- @Nullable String encryptionKeyAlias,
- @Nullable String encryptionCipherProvider,
- long usableSpaceRefreshInterval, boolean fsyncPerTransaction,
- int fsyncInterval) throws IOException {
- super(file, logFileID, maxFileSize, CipherProviderFactory.
- getEncrypter(encryptionCipherProvider, encryptionKey),
- usableSpaceRefreshInterval, fsyncPerTransaction, fsyncInterval);
+ @Nullable Key encryptionKey,
+ @Nullable String encryptionKeyAlias,
+ @Nullable String encryptionCipherProvider,
+ long usableSpaceRefreshInterval, boolean fsyncPerTransaction,
+ int fsyncInterval) throws IOException {
+ super(file, logFileID, maxFileSize,
+ CipherProviderFactory.getEncrypter(encryptionCipherProvider, encryptionKey),
+ usableSpaceRefreshInterval, fsyncPerTransaction, fsyncInterval);
ProtosFactory.LogFileMetaData.Builder metaDataBuilder =
ProtosFactory.LogFileMetaData.newBuilder();
- if(encryptionKey != null) {
+ if (encryptionKey != null) {
Preconditions.checkNotNull(encryptionKeyAlias, "encryptionKeyAlias");
Preconditions.checkNotNull(encryptionCipherProvider,
"encryptionCipherProvider");
@@ -208,6 +211,7 @@ public class LogFileV3 extends LogFile {
File metaDataFile = Serialization.getMetaDataFile(file);
writeDelimitedTo(metaDataBuilder.build(), metaDataFile);
}
+
@Override
int getVersion() {
return Serialization.VERSION_3;
@@ -221,28 +225,29 @@ public class LogFileV3 extends LogFile {
private volatile String cipherProvider;
private volatile byte[] parameters;
private BlockingQueue<CipherProvider.Decryptor> decryptors =
- new LinkedBlockingDeque<CipherProvider.Decryptor>();
+ new LinkedBlockingDeque<CipherProvider.Decryptor>();
RandomReader(File file, @Nullable KeyProvider encryptionKeyProvider,
- boolean fsyncPerTransaction) throws IOException {
+ boolean fsyncPerTransaction) throws IOException {
super(file, encryptionKeyProvider, fsyncPerTransaction);
}
+
private void initialize() throws IOException {
File metaDataFile = Serialization.getMetaDataFile(getFile());
FileInputStream inputStream = new FileInputStream(metaDataFile);
try {
- ProtosFactory.LogFileMetaData metaData =
- Preconditions.checkNotNull(ProtosFactory.LogFileMetaData.
- parseDelimitedFrom(inputStream), "MetaData cannot be null");
+ ProtosFactory.LogFileMetaData metaData = Preconditions.checkNotNull(
+ ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream),
+ "MetaData cannot be null");
int version = metaData.getVersion();
- if(version != getVersion()) {
+ if (version != getVersion()) {
throw new IOException("Version is " + Integer.toHexString(version) +
" expected " + Integer.toHexString(getVersion())
+ " file: " + getFile().getCanonicalPath());
}
encryptionEnabled = false;
- if(metaData.hasEncryption()) {
- if(getKeyProvider() == null) {
+ if (metaData.hasEncryption()) {
+ if (getKeyProvider() == null) {
throw new IllegalStateException("Data file is encrypted but no " +
" provider was specified");
}
@@ -255,23 +260,26 @@ public class LogFileV3 extends LogFile {
} finally {
try {
inputStream.close();
- } catch(IOException e) {
+ } catch (IOException e) {
LOGGER.warn("Unable to close " + metaDataFile, e);
}
}
}
+
private CipherProvider.Decryptor getDecryptor() {
CipherProvider.Decryptor decryptor = decryptors.poll();
- if(decryptor == null) {
+ if (decryptor == null) {
decryptor = CipherProviderFactory.getDecrypter(cipherProvider, key,
parameters);
}
return decryptor;
}
+
@Override
int getVersion() {
return Serialization.VERSION_3;
}
+
@Override
protected TransactionEventRecord doGet(RandomAccessFile fileHandle)
throws IOException, CorruptEventException {
@@ -279,7 +287,7 @@ public class LogFileV3 extends LogFile {
// empty. As such we wait to initialize until there is some
// data before we we initialize
synchronized (this) {
- if(!initialized) {
+ if (!initialized) {
initialized = true;
initialize();
}
@@ -288,18 +296,17 @@ public class LogFileV3 extends LogFile {
CipherProvider.Decryptor decryptor = null;
try {
byte[] buffer = readDelimitedBuffer(fileHandle);
- if(encryptionEnabled) {
+ if (encryptionEnabled) {
decryptor = getDecryptor();
buffer = decryptor.decrypt(buffer);
}
- TransactionEventRecord event = TransactionEventRecord.
- fromByteArray(buffer);
+ TransactionEventRecord event = TransactionEventRecord.fromByteArray(buffer);
success = true;
return event;
- } catch(DecryptionFailureException ex) {
+ } catch (DecryptionFailureException ex) {
throw new CorruptEventException("Error decrypting event", ex);
} finally {
- if(success && encryptionEnabled && decryptor != null) {
+ if (success && encryptionEnabled && decryptor != null) {
decryptors.offer(decryptor);
}
}
@@ -309,9 +316,10 @@ public class LogFileV3 extends LogFile {
public static class SequentialReader extends LogFile.SequentialReader {
private CipherProvider.Decryptor decryptor;
private final boolean fsyncPerTransaction;
+
public SequentialReader(File file, @Nullable KeyProvider
- encryptionKeyProvider, boolean fsyncPerTransaction) throws EOFException,
- IOException {
+ encryptionKeyProvider, boolean fsyncPerTransaction) throws EOFException,
+ IOException {
super(file, encryptionKeyProvider);
this.fsyncPerTransaction = fsyncPerTransaction;
File metaDataFile = Serialization.getMetaDataFile(file);
@@ -321,32 +329,31 @@ public class LogFileV3 extends LogFile {
ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream),
"MetaData cannot be null");
int version = metaData.getVersion();
- if(version != getVersion()) {
+ if (version != getVersion()) {
throw new IOException("Version is " + Integer.toHexString(version) +
" expected " + Integer.toHexString(getVersion())
+ " file: " + file.getCanonicalPath());
}
- if(metaData.hasEncryption()) {
- if(getKeyProvider() == null) {
+ if (metaData.hasEncryption()) {
+ if (getKeyProvider() == null) {
throw new IllegalStateException("Data file is encrypted but no " +
" provider was specified");
}
ProtosFactory.LogFileEncryption encryption = metaData.getEncryption();
Key key = getKeyProvider().getKey(encryption.getKeyAlias());
- decryptor = CipherProviderFactory.
- getDecrypter(encryption.getCipherProvider(), key,
- encryption.getParameters().toByteArray());
+ decryptor = CipherProviderFactory.getDecrypter(
+ encryption.getCipherProvider(), key, encryption.getParameters().toByteArray());
}
setLogFileID(metaData.getLogFileID());
setLastCheckpointPosition(metaData.getCheckpointPosition());
setLastCheckpointWriteOrderID(metaData.getCheckpointWriteOrderID());
setPreviousCheckpointPosition(metaData.getBackupCheckpointPosition());
setPreviousCheckpointWriteOrderID(
- metaData.getBackupCheckpointWriteOrderID());
+ metaData.getBackupCheckpointWriteOrderID());
} finally {
try {
inputStream.close();
- } catch(IOException e) {
+ } catch (IOException e) {
LOGGER.warn("Unable to close " + metaDataFile, e);
}
}
@@ -359,7 +366,7 @@ public class LogFileV3 extends LogFile {
@Override
LogRecord doNext(int offset) throws IOException, CorruptEventException,
- DecryptionFailureException {
+ DecryptionFailureException {
byte[] buffer = null;
TransactionEventRecord event = null;
try {
@@ -370,7 +377,7 @@ public class LogFileV3 extends LogFile {
event = TransactionEventRecord.fromByteArray(buffer);
} catch (CorruptEventException ex) {
LOGGER.warn("Corrupt file found. File id: log-" + this.getLogFileID(),
- ex);
+ ex);
// Return null so that replay handler thinks all events in this file
// have been taken.
if (!fsyncPerTransaction) {
@@ -380,7 +387,7 @@ public class LogFileV3 extends LogFile {
} catch (DecryptionFailureException ex) {
if (!fsyncPerTransaction) {
LOGGER.warn("Could not decrypt even read from channel. Skipping " +
- "event.", ex);
+ "event.", ex);
return null;
}
throw ex;
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogRecord.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogRecord.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogRecord.java
index 19ad0d6..5a75627 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogRecord.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogRecord.java
@@ -20,15 +20,17 @@ package org.apache.flume.channel.file;
import java.util.Arrays;
-
public class LogRecord implements Comparable<LogRecord> {
- private int fileID, offset;
+ private int fileID;
+ private int offset;
private TransactionEventRecord event;
+
public LogRecord(int fileID, int offset, TransactionEventRecord event) {
this.fileID = fileID;
this.offset = offset;
this.event = event;
}
+
public int getFileID() {
return fileID;
}
@@ -41,20 +43,16 @@ public class LogRecord implements Comparable<LogRecord> {
@Override
public int compareTo(LogRecord o) {
- int result = new Long(event.getLogWriteOrderID())
- .compareTo(o.getEvent().getLogWriteOrderID());
- if(result == 0) {
+ int result = new Long(event.getLogWriteOrderID()).compareTo(o.getEvent().getLogWriteOrderID());
+ if (result == 0) {
// oops we have hit a flume-1.2 bug. let's try and use the txid
// to replay the events
- result = new Long(event.getTransactionID())
- .compareTo(o.getEvent().getTransactionID());
- if(result == 0) {
+ result = new Long(event.getTransactionID()).compareTo(o.getEvent().getTransactionID());
+ if (result == 0) {
// events are within the same transaction. Basically we want commit
// and rollback to come after take and put
- Integer thisIndex = Arrays.binarySearch(replaySortOrder,
- event.getRecordType());
- Integer thatIndex = Arrays.binarySearch(replaySortOrder,
- o.getEvent().getRecordType());
+ Integer thisIndex = Arrays.binarySearch(replaySortOrder, event.getRecordType());
+ Integer thatIndex = Arrays.binarySearch(replaySortOrder, o.getEvent().getRecordType());
return thisIndex.compareTo(thatIndex);
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
index d1498c2..48177d0 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
@@ -64,7 +64,7 @@ public class LogUtils {
static List<File> getLogs(File logDir) {
List<File> result = Lists.newArrayList();
File[] files = logDir.listFiles();
- if(files == null) {
+ if (files == null) {
String msg = logDir + ".listFiles() returned null: ";
msg += "File = " + logDir.isFile() + ", ";
msg += "Exists = " + logDir.exists() + ", ";
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java
index dfcdd73..b74ff7b 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java
@@ -19,19 +19,21 @@
package org.apache.flume.channel.file;
class Pair<L,R> {
-
private final L left;
private final R right;
+
Pair(L l, R r) {
left = l;
right = r;
}
+
L getLeft() {
return left;
}
R getRight() {
return right;
}
+
static <L, R> Pair<L, R> of(L left, R right) {
return new Pair<L, R>(left, right);
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
index f08f024..0a70a24 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
@@ -78,8 +78,8 @@ class Put extends TransactionEventRecord {
Map<String, String> headers = event.getHeaders();
ProtosFactory.FlumeEventHeader.Builder headerBuilder =
ProtosFactory.FlumeEventHeader.newBuilder();
- if(headers != null) {
- for(String key : headers.keySet()) {
+ if (headers != null) {
+ for (String key : headers.keySet()) {
String value = headers.get(key);
headerBuilder.clear();
eventBuilder.addHeaders(headerBuilder.setKey(key)
@@ -93,13 +93,12 @@ class Put extends TransactionEventRecord {
putBuilder.build().writeDelimitedTo(out);
}
@Override
- void readProtos(InputStream in) throws IOException,
- CorruptEventException {
- ProtosFactory.Put put = Preconditions.checkNotNull(ProtosFactory.
- Put.parseDelimitedFrom(in), "Put cannot be null");
+ void readProtos(InputStream in) throws IOException, CorruptEventException {
+ ProtosFactory.Put put = Preconditions.checkNotNull(
+ ProtosFactory.Put.parseDelimitedFrom(in), "Put cannot be null");
Map<String, String> headers = Maps.newHashMap();
ProtosFactory.FlumeEvent protosEvent = put.getEvent();
- for(ProtosFactory.FlumeEventHeader header : protosEvent.getHeadersList()) {
+ for (ProtosFactory.FlumeEventHeader header : protosEvent.getHeadersList()) {
headers.put(header.getKey(), header.getValue());
}
byte[] eventBody = protosEvent.getBody().toByteArray();
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
index a559503..662fd42 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
@@ -99,8 +99,8 @@ class ReplayHandler {
}
ReplayHandler(FlumeEventQueue queue,
- @Nullable KeyProvider encryptionKeyProvider,
- boolean fsyncPerTransaction) {
+ @Nullable KeyProvider encryptionKeyProvider,
+ boolean fsyncPerTransaction) {
this.queue = queue;
this.lastCheckpoint = queue.getLogWriteOrderID();
pendingTakes = Lists.newArrayList();
@@ -109,6 +109,7 @@ class ReplayHandler {
this.encryptionKeyProvider = encryptionKeyProvider;
this.fsyncPerTransaction = fsyncPerTransaction;
}
+
/**
* Replay logic from Flume1.2 which can be activated if the v2 logic
* is failing on ol logs for some reason.
@@ -165,9 +166,8 @@ class ReplayHandler {
commitCount++;
@SuppressWarnings("unchecked")
Collection<FlumeEventPointer> pointers =
- (Collection<FlumeEventPointer>) transactionMap.remove(trans);
- if (((Commit) record).getType()
- == TransactionEventRecord.Type.TAKE.get()) {
+ (Collection<FlumeEventPointer>) transactionMap.remove(trans);
+ if (((Commit) record).getType() == TransactionEventRecord.Type.TAKE.get()) {
if (inflightTakes.containsKey(trans)) {
if (pointers == null) {
pointers = Sets.newHashSet();
@@ -185,8 +185,8 @@ class ReplayHandler {
count += pointers.size();
}
} else {
- Preconditions.checkArgument(false, "Unknown record type: "
- + Integer.toHexString(type));
+ Preconditions.checkArgument(false,
+ "Unknown record type: " + Integer.toHexString(type));
}
} else {
@@ -196,8 +196,8 @@ class ReplayHandler {
LOG.info("Replayed " + count + " from " + log);
if (LOG.isDebugEnabled()) {
LOG.debug("read: " + readCount + ", put: " + putCount + ", take: "
- + takeCount + ", rollback: " + rollbackCount + ", commit: "
- + commitCount + ", skipp: " + skipCount);
+ + takeCount + ", rollback: " + rollbackCount + ", commit: "
+ + commitCount + ", skipp: " + skipCount);
}
} catch (EOFException e) {
LOG.warn("Hit EOF on " + log);
@@ -262,21 +262,20 @@ class ReplayHandler {
LOG.info("Replaying " + log);
try {
LogFile.SequentialReader reader =
- LogFileFactory.getSequentialReader(log, encryptionKeyProvider,
- fsyncPerTransaction);
+ LogFileFactory.getSequentialReader(log, encryptionKeyProvider, fsyncPerTransaction);
reader.skipToLastCheckpointPosition(queue.getLogWriteOrderID());
Preconditions.checkState(!readers.containsKey(reader.getLogFileID()),
"Readers " + readers + " already contains "
+ reader.getLogFileID());
readers.put(reader.getLogFileID(), reader);
LogRecord logRecord = reader.next();
- if(logRecord == null) {
+ if (logRecord == null) {
readers.remove(reader.getLogFileID());
reader.close();
} else {
logRecordBuffer.add(logRecord);
}
- } catch(EOFException e) {
+ } catch (EOFException e) {
LOG.warn("Ignoring " + log + " due to EOF", e);
}
}
@@ -294,7 +293,7 @@ class ReplayHandler {
writeOrderIDSeed = Math.max(writeOrderIDSeed,
record.getLogWriteOrderID());
readCount++;
- if(readCount % 10000 == 0 && readCount > 0) {
+ if (readCount % 10000 == 0 && readCount > 0) {
LOG.info("read: " + readCount + ", put: " + putCount + ", take: "
+ takeCount + ", rollback: " + rollbackCount + ", commit: "
+ commitCount + ", skip: " + skipCount + ", eventCount:" + count);
@@ -316,11 +315,11 @@ class ReplayHandler {
commitCount++;
@SuppressWarnings("unchecked")
Collection<FlumeEventPointer> pointers =
- (Collection<FlumeEventPointer>) transactionMap.remove(trans);
+ (Collection<FlumeEventPointer>) transactionMap.remove(trans);
if (((Commit) record).getType()
== TransactionEventRecord.Type.TAKE.get()) {
if (inflightTakes.containsKey(trans)) {
- if(pointers == null){
+ if (pointers == null) {
pointers = Sets.newHashSet();
}
Set<Long> takes = inflightTakes.removeAll(trans);
@@ -350,8 +349,8 @@ class ReplayHandler {
} finally {
TransactionIDOracle.setSeed(transactionIDSeed);
WriteOrderOracle.setSeed(writeOrderIDSeed);
- for(LogFile.SequentialReader reader : readers.values()) {
- if(reader != null) {
+ for (LogFile.SequentialReader reader : readers.values()) {
+ if (reader != null) {
reader.close();
}
}
@@ -378,11 +377,11 @@ class ReplayHandler {
}
private LogRecord next() throws IOException, CorruptEventException {
LogRecord resultLogRecord = logRecordBuffer.poll();
- if(resultLogRecord != null) {
+ if (resultLogRecord != null) {
// there is more log records to read
LogFile.SequentialReader reader = readers.get(resultLogRecord.getFileID());
LogRecord nextLogRecord;
- if((nextLogRecord = reader.next()) != null) {
+ if ((nextLogRecord = reader.next()) != null) {
logRecordBuffer.add(nextLogRecord);
}
}
@@ -391,7 +390,7 @@ class ReplayHandler {
private void processCommit(short type, Collection<FlumeEventPointer> pointers) {
if (type == TransactionEventRecord.Type.PUT.get()) {
for (FlumeEventPointer pointer : pointers) {
- if(!queue.addTail(pointer)) {
+ if (!queue.addTail(pointer)) {
throw new IllegalStateException("Unable to add "
+ pointer + ". Queue depth = " + queue.getSize()
+ ", Capacity = " + queue.getCapacity());
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
index 335ad0b..2fca755 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
@@ -35,6 +35,7 @@ class Rollback extends TransactionEventRecord {
Rollback(Long transactionID, Long logWriteOrderID) {
super(transactionID, logWriteOrderID);
}
+
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
@@ -44,22 +45,26 @@ class Rollback extends TransactionEventRecord {
public void write(DataOutput out) throws IOException {
super.write(out);
}
+
@Override
void writeProtos(OutputStream out) throws IOException {
ProtosFactory.Rollback.Builder rollbackBuilder =
ProtosFactory.Rollback.newBuilder();
rollbackBuilder.build().writeDelimitedTo(out);
}
+
@Override
void readProtos(InputStream in) throws IOException {
@SuppressWarnings("unused")
- ProtosFactory.Rollback rollback = Preconditions.checkNotNull(ProtosFactory.
- Rollback.parseDelimitedFrom(in), "Rollback cannot be null");
+ ProtosFactory.Rollback rollback = Preconditions.checkNotNull(
+ ProtosFactory.Rollback.parseDelimitedFrom(in), "Rollback cannot be null");
}
+
@Override
short getRecordType() {
return Type.ROLLBACK.get();
}
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
index a6eda75..19303cc 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
@@ -46,14 +46,12 @@ public class Serialization {
static final long SIZE_OF_INT = 4;
static final int SIZE_OF_LONG = 8;
-
static final int VERSION_2 = 2;
static final int VERSION_3 = 3;
public static final String METADATA_FILENAME = ".meta";
public static final String METADATA_TMP_FILENAME = ".tmp";
- public static final String OLD_METADATA_FILENAME = METADATA_FILENAME +
- ".old";
+ public static final String OLD_METADATA_FILENAME = METADATA_FILENAME + ".old";
// 64 K buffer to copy and compress files.
private static final int FILE_BUFFER_SIZE = 64 * 1024;
@@ -63,12 +61,11 @@ public class Serialization {
static File getMetaDataTempFile(File metaDataFile) {
String metaDataFileName = metaDataFile.getName() + METADATA_TMP_FILENAME;
return new File(metaDataFile.getParentFile(), metaDataFileName);
-
}
+
static File getMetaDataFile(File file) {
String metaDataFileName = file.getName() + METADATA_FILENAME;
return new File(file.getParentFile(), metaDataFileName);
-
}
// Support platforms that cannot do atomic renames - FLUME-1699
@@ -79,19 +76,20 @@ public class Serialization {
/**
* Deletes all files in given directory.
+ *
* @param checkpointDir - The directory whose files are to be deleted
- * @param excludes - Names of files which should not be deleted from this
- * directory.
+ * @param excludes - Names of files which should not be deleted from this
+ * directory.
* @return - true if all files were successfully deleted, false otherwise.
*/
static boolean deleteAllFiles(File checkpointDir,
- @Nullable Set<String> excludes) {
+ @Nullable Set<String> excludes) {
if (!checkpointDir.isDirectory()) {
return false;
}
File[] files = checkpointDir.listFiles();
- if(files == null) {
+ if (files == null) {
return false;
}
StringBuilder builder;
@@ -100,13 +98,13 @@ public class Serialization {
} else {
builder = new StringBuilder("Deleted the following files: ");
}
- if(excludes == null) {
+ if (excludes == null) {
excludes = Collections.emptySet();
}
for (File file : files) {
- if(excludes.contains(file.getName())) {
+ if (excludes.contains(file.getName())) {
LOG.info("Skipping " + file.getName() + " because it is in excludes " +
- "set");
+ "set");
continue;
}
if (!FileUtils.deleteQuietly(file)) {
@@ -125,18 +123,19 @@ public class Serialization {
/**
* Copy a file using a 64K size buffer. This method will copy the file and
* then fsync to disk
+ *
* @param from File to copy - this file should exist
- * @param to Destination file - this file should not exist
+ * @param to Destination file - this file should not exist
* @return true if the copy was successful
*/
public static boolean copyFile(File from, File to) throws IOException {
Preconditions.checkNotNull(from, "Source file is null, file copy failed.");
Preconditions.checkNotNull(to, "Destination file is null, " +
- "file copy failed.");
+ "file copy failed.");
Preconditions.checkState(from.exists(), "Source file: " + from.toString() +
- " does not exist.");
+ " does not exist.");
Preconditions.checkState(!to.exists(), "Destination file: "
- + to.toString() + " unexpectedly exists.");
+ + to.toString() + " unexpectedly exists.");
BufferedInputStream in = null;
RandomAccessFile out = null; //use a RandomAccessFile for easy fsync
@@ -145,7 +144,7 @@ public class Serialization {
out = new RandomAccessFile(to, "rw");
byte[] buf = new byte[FILE_BUFFER_SIZE];
int total = 0;
- while(true) {
+ while (true) {
int read = in.read(buf);
if (read == -1) {
break;
@@ -155,11 +154,11 @@ public class Serialization {
}
out.getFD().sync();
Preconditions.checkState(total == from.length(),
- "The size of the origin file and destination file are not equal.");
+ "The size of the origin file and destination file are not equal.");
return true;
} catch (Exception ex) {
LOG.error("Error while attempting to copy " + from.toString() + " to "
- + to.toString() + ".", ex);
+ + to.toString() + ".", ex);
Throwables.propagate(ex);
} finally {
Throwable th = null;
@@ -185,26 +184,26 @@ public class Serialization {
}
// Should never reach here.
throw new IOException("Copying file: " + from.toString() + " to: " + to
- .toString() + " may have failed.");
+ .toString() + " may have failed.");
}
/**
* Compress file using Snappy
+ *
* @param uncompressed File to compress - this file should exist
- * @param compressed Compressed file - this file should not exist
+ * @param compressed Compressed file - this file should not exist
* @return true if compression was successful
*/
public static boolean compressFile(File uncompressed, File compressed)
- throws IOException {
+ throws IOException {
Preconditions.checkNotNull(uncompressed,
- "Source file is null, compression failed.");
+ "Source file is null, compression failed.");
Preconditions.checkNotNull(compressed,
- "Destination file is null, compression failed.");
+ "Destination file is null, compression failed.");
Preconditions.checkState(uncompressed.exists(), "Source file: " +
- uncompressed.toString() + " does not exist.");
+ uncompressed.toString() + " does not exist.");
Preconditions.checkState(!compressed.exists(),
- "Compressed file: " + compressed.toString() + " unexpectedly " +
- "exists.");
+ "Compressed file: " + compressed.toString() + " unexpectedly " + "exists.");
BufferedInputStream in = null;
FileOutputStream out = null;
@@ -215,7 +214,7 @@ public class Serialization {
snappyOut = new SnappyOutputStream(out);
byte[] buf = new byte[FILE_BUFFER_SIZE];
- while(true) {
+ while (true) {
int read = in.read(buf);
if (read == -1) {
break;
@@ -226,8 +225,7 @@ public class Serialization {
return true;
} catch (Exception ex) {
LOG.error("Error while attempting to compress " +
- uncompressed.toString() + " to " + compressed.toString()
- + ".", ex);
+ uncompressed.toString() + " to " + compressed.toString() + ".", ex);
Throwables.propagate(ex);
} finally {
Throwable th = null;
@@ -253,26 +251,24 @@ public class Serialization {
}
// Should never reach here.
throw new IOException("Copying file: " + uncompressed.toString()
- + " to: " + compressed.toString() + " may have failed.");
+ + " to: " + compressed.toString() + " may have failed.");
}
/**
* Decompress file using Snappy
- * @param compressed File to compress - this file should exist
+ *
+ * @param compressed File to compress - this file should exist
* @param decompressed Compressed file - this file should not exist
* @return true if decompression was successful
*/
- public static boolean decompressFile(File compressed, File decompressed)
- throws IOException {
- Preconditions.checkNotNull(compressed,
- "Source file is null, decompression failed.");
+ public static boolean decompressFile(File compressed, File decompressed) throws IOException {
+ Preconditions.checkNotNull(compressed, "Source file is null, decompression failed.");
Preconditions.checkNotNull(decompressed, "Destination file is " +
- "null, decompression failed.");
+ "null, decompression failed.");
Preconditions.checkState(compressed.exists(), "Source file: " +
- compressed.toString() + " does not exist.");
+ compressed.toString() + " does not exist.");
Preconditions.checkState(!decompressed.exists(),
- "Decompressed file: " + decompressed.toString() +
- " unexpectedly exists.");
+ "Decompressed file: " + decompressed.toString() + " unexpectedly exists.");
BufferedInputStream in = null;
SnappyInputStream snappyIn = null;
@@ -283,7 +279,7 @@ public class Serialization {
out = new FileOutputStream(decompressed);
byte[] buf = new byte[FILE_BUFFER_SIZE];
- while(true) {
+ while (true) {
int read = snappyIn.read(buf);
if (read == -1) {
break;
@@ -294,8 +290,8 @@ public class Serialization {
return true;
} catch (Exception ex) {
LOG.error("Error while attempting to compress " +
- compressed.toString() + " to " + decompressed.toString() +
- ".", ex);
+ compressed.toString() + " to " + decompressed.toString() +
+ ".", ex);
Throwables.propagate(ex);
} finally {
Throwable th = null;
@@ -321,7 +317,7 @@ public class Serialization {
}
// Should never reach here.
throw new IOException("Decompressing file: " +
- compressed.toString() + " to: " + decompressed.toString() +
- " may have failed.");
+ compressed.toString() + " to: " + decompressed.toString() +
+ " may have failed.");
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
index 143143a..ee7fcc8 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
@@ -34,14 +34,17 @@ import com.google.common.base.Preconditions;
class Take extends TransactionEventRecord {
private int offset;
private int fileID;
+
Take(Long transactionID, Long logWriteOrderID) {
super(transactionID, logWriteOrderID);
}
+
Take(Long transactionID, Long logWriteOrderID, int offset, int fileID) {
this(transactionID, logWriteOrderID);
this.offset = offset;
this.fileID = fileID;
}
+
int getOffset() {
return offset;
}
@@ -70,17 +73,20 @@ class Take extends TransactionEventRecord {
takeBuilder.setOffset(offset);
takeBuilder.build().writeDelimitedTo(out);
}
+
@Override
void readProtos(InputStream in) throws IOException {
- ProtosFactory.Take take = Preconditions.checkNotNull(ProtosFactory.
- Take.parseDelimitedFrom(in), "Take cannot be null");
+ ProtosFactory.Take take = Preconditions.checkNotNull(
+ ProtosFactory.Take.parseDelimitedFrom(in), "Take cannot be null");
fileID = take.getFileID();
offset = take.getOffset();
}
+
@Override
short getRecordType() {
return Type.TAKE.get();
}
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
index 1eb3f4f..0f7c3c8 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
@@ -91,13 +91,16 @@ public abstract class TransactionEventRecord implements Writable {
COMMIT((short)4);
private short id;
+
Type(short id) {
this.id = id;
}
+
public short get() {
return id;
}
}
+
private static final ImmutableMap<Short, Constructor<? extends TransactionEventRecord>> TYPES;
static {
@@ -131,11 +134,11 @@ public abstract class TransactionEventRecord implements Writable {
dataOutput.flush();
// TODO toByteArray does an unneeded copy
return ByteBuffer.wrap(byteOutput.toByteArray());
- } catch(IOException e) {
+ } catch (IOException e) {
// near impossible
throw Throwables.propagate(e);
} finally {
- if(dataOutput != null) {
+ if (dataOutput != null) {
try {
dataOutput.close();
} catch (IOException e) {
@@ -149,7 +152,7 @@ public abstract class TransactionEventRecord implements Writable {
static TransactionEventRecord fromDataInputV2(DataInput in)
throws IOException {
int header = in.readInt();
- if(header != MAGIC_HEADER) {
+ if (header != MAGIC_HEADER) {
throw new IOException("Header " + Integer.toHexString(header) +
" is not the required value: " + Integer.toHexString(MAGIC_HEADER));
}
@@ -176,10 +179,10 @@ public abstract class TransactionEventRecord implements Writable {
ProtosFactory.TransactionEventFooter.newBuilder().build();
footer.writeDelimitedTo(byteOutput);
return ByteBuffer.wrap(byteOutput.toByteArray());
- } catch(IOException e) {
+ } catch (IOException e) {
throw Throwables.propagate(e);
} finally {
- if(byteOutput != null) {
+ if (byteOutput != null) {
try {
byteOutput.close();
} catch (IOException e) {
@@ -194,23 +197,19 @@ public abstract class TransactionEventRecord implements Writable {
throws IOException, CorruptEventException {
ByteArrayInputStream in = new ByteArrayInputStream(buffer);
try {
- ProtosFactory.TransactionEventHeader header = Preconditions.
- checkNotNull(ProtosFactory.TransactionEventHeader.
- parseDelimitedFrom(in), "Header cannot be null");
+ ProtosFactory.TransactionEventHeader header = Preconditions.checkNotNull(
+ ProtosFactory.TransactionEventHeader.parseDelimitedFrom(in), "Header cannot be null");
short type = (short)header.getType();
long transactionID = header.getTransactionID();
long writeOrderID = header.getWriteOrderID();
- TransactionEventRecord transactionEvent =
- newRecordForType(type, transactionID, writeOrderID);
+ TransactionEventRecord transactionEvent = newRecordForType(type, transactionID, writeOrderID);
transactionEvent.readProtos(in);
@SuppressWarnings("unused")
ProtosFactory.TransactionEventFooter footer = Preconditions.checkNotNull(
- ProtosFactory.TransactionEventFooter.
- parseDelimitedFrom(in), "Footer cannot be null");
+ ProtosFactory.TransactionEventFooter.parseDelimitedFrom(in), "Footer cannot be null");
return transactionEvent;
} catch (InvalidProtocolBufferException ex) {
- throw new CorruptEventException(
- "Could not parse event from data file.", ex);
+ throw new CorruptEventException("Could not parse event from data file.", ex);
} finally {
try {
in.close();
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionIDOracle.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionIDOracle.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionIDOracle.java
index a9f6be6..12e5c7d 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionIDOracle.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionIDOracle.java
@@ -23,15 +23,17 @@ import java.util.concurrent.atomic.AtomicLong;
public final class TransactionIDOracle {
private TransactionIDOracle() {}
+
private static final AtomicLong TRANSACTION_ID =
new AtomicLong(System.currentTimeMillis());
public static void setSeed(long highest) {
long previous;
- while(highest > (previous = TRANSACTION_ID.get())) {
+ while (highest > (previous = TRANSACTION_ID.get())) {
TRANSACTION_ID.compareAndSet(previous, highest);
}
}
+
public static long next() {
return TRANSACTION_ID.incrementAndGet();
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java
index 69072db..2ebd42d 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java
@@ -75,7 +75,7 @@ class WritableUtils {
long tmp = i;
while (tmp != 0) {
tmp = tmp >> 8;
- len--;
+ len--;
}
stream.writeByte((byte)len);
@@ -92,8 +92,8 @@ class WritableUtils {
/**
* Reads a zero-compressed encoded long from input stream and returns it.
* @param stream Binary input stream
- * @throws java.io.IOException
* @return deserialized long from stream.
+ * @throws java.io.IOException
*/
public static long readVLong(DataInput stream) throws IOException {
byte firstByte = stream.readByte();
@@ -102,7 +102,7 @@ class WritableUtils {
return firstByte;
}
long i = 0;
- for (int idx = 0; idx < len-1; idx++) {
+ for (int idx = 0; idx < len - 1; idx++) {
byte b = stream.readByte();
i = i << 8;
i = i | (b & 0xFF);
@@ -113,8 +113,8 @@ class WritableUtils {
/**
* Reads a zero-compressed encoded integer from input stream and returns it.
* @param stream Binary input stream
- * @throws java.io.IOException
* @return deserialized integer from stream.
+ * @throws java.io.IOException
*/
public static int readVInt(DataInput stream) throws IOException {
long n = readVLong(stream);
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WriteOrderOracle.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WriteOrderOracle.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WriteOrderOracle.java
index dbf1c1e..b26cbb4 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WriteOrderOracle.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WriteOrderOracle.java
@@ -23,15 +23,17 @@ import java.util.concurrent.atomic.AtomicLong;
public final class WriteOrderOracle {
private WriteOrderOracle() {}
+
private static final AtomicLong WRITER_ORDERER =
new AtomicLong(System.currentTimeMillis());
public static void setSeed(long highest) {
long previous;
- while(highest > (previous = WRITER_ORDERER.get())) {
+ while (highest > (previous = WRITER_ORDERER.get())) {
WRITER_ORDERER.compareAndSet(previous, highest);
}
}
+
public static long next() {
return WRITER_ORDERER.incrementAndGet();
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java
index 9ee4245..e1116d2 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java
@@ -46,7 +46,8 @@ public class AESCTRNoPaddingProvider extends CipherProvider {
}
public static class EncryptorBuilder
- extends CipherProvider.Encryptor.Builder<AESCTRNoPaddingEncryptor> {
+ extends CipherProvider.Encryptor.Builder<AESCTRNoPaddingEncryptor> {
+
@Override
public AESCTRNoPaddingEncryptor build() {
ByteBuffer buffer = ByteBuffer.allocate(16);
@@ -58,9 +59,8 @@ public class AESCTRNoPaddingProvider extends CipherProvider {
}
}
-
public static class DecryptorBuilder
- extends CipherProvider.Decryptor.Builder<AESCTRNoPaddingDecryptor> {
+ extends CipherProvider.Decryptor.Builder<AESCTRNoPaddingDecryptor> {
@Override
public AESCTRNoPaddingDecryptor build() {
return new AESCTRNoPaddingDecryptor(key, parameters);
@@ -70,18 +70,22 @@ public class AESCTRNoPaddingProvider extends CipherProvider {
private static class AESCTRNoPaddingEncryptor extends Encryptor {
private byte[] parameters;
private Cipher cipher;
+
private AESCTRNoPaddingEncryptor(Key key, byte[] parameters) {
this.parameters = parameters;
cipher = getCipher(key, Cipher.ENCRYPT_MODE, parameters);
}
+
@Override
public byte[] getParameters() {
return parameters;
}
+
@Override
public String getCodec() {
return TYPE;
}
+
@Override
public byte[] encrypt(byte[] clearText) {
return doFinal(cipher, clearText);
@@ -90,21 +94,23 @@ public class AESCTRNoPaddingProvider extends CipherProvider {
private static class AESCTRNoPaddingDecryptor extends Decryptor {
private Cipher cipher;
+
private AESCTRNoPaddingDecryptor(Key key, byte[] parameters) {
cipher = getCipher(key, Cipher.DECRYPT_MODE, parameters);
}
+
@Override
public byte[] decrypt(byte[] cipherText) {
return doFinal(cipher, cipherText);
}
+
@Override
public String getCodec() {
return TYPE;
}
}
- private static byte[] doFinal(Cipher cipher, byte[] input)
- throws DecryptionFailureException{
+ private static byte[] doFinal(Cipher cipher, byte[] input) throws DecryptionFailureException {
try {
return cipher.doFinal(input);
} catch (Exception e) {