You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/06/13 02:36:34 UTC
[3/3] git commit: FLUME-1586: File Channel should support verifying
integrity of individual events.
FLUME-1586: File Channel should support verifying integrity of individual events.
(Hari Shreedharan via Brock Noland)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c57ebd1d
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c57ebd1d
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c57ebd1d
Branch: refs/heads/trunk
Commit: c57ebd1d2296fd1c0c8f84eecb16799de2a73e35
Parents: 791f443
Author: Brock Noland <br...@apache.org>
Authored: Wed Jun 12 17:35:38 2013 -0700
Committer: Brock Noland <br...@apache.org>
Committed: Wed Jun 12 17:35:38 2013 -0700
----------------------------------------------------------------------
bin/flume-ng | 6 +
.../channel/file/CorruptEventException.java | 36 +
.../apache/flume/channel/file/FileChannel.java | 47 +-
.../java/org/apache/flume/channel/file/Log.java | 12 +-
.../org/apache/flume/channel/file/LogFile.java | 104 +-
.../apache/flume/channel/file/LogFileV3.java | 18 +-
.../flume/channel/file/NoopRecordException.java | 35 +
.../java/org/apache/flume/channel/file/Put.java | 38 +-
.../flume/channel/file/ReplayHandler.java | 3 +-
.../flume/channel/file/Serialization.java | 15 +-
.../channel/file/TransactionEventRecord.java | 10 +-
.../flume/channel/file/proto/ProtosFactory.java | 1127 +++++++++---------
.../src/main/proto/filechannel.proto | 1 +
.../flume/channel/file/TestFileChannel.java | 44 +-
.../org/apache/flume/channel/file/TestLog.java | 28 +-
.../apache/flume/channel/file/TestLogFile.java | 111 +-
.../file/TestTransactionEventRecordV3.java | 16 +-
.../apache/flume/channel/file/TestUtils.java | 55 +-
flume-ng-dist/pom.xml | 4 +
flume-ng-dist/src/main/assembly/bin.xml | 1 +
flume-ng-dist/src/main/assembly/src.xml | 2 +
flume-tools/pom.xml | 143 +++
.../flume/tools/FileChannelIntegrityTool.java | 142 +++
.../java/org/apache/flume/tools/FlumeTool.java | 24 +
.../org/apache/flume/tools/FlumeToolType.java | 40 +
.../org/apache/flume/tools/FlumeToolsMain.java | 68 ++
.../tools/TestFileChannelIntegrityTool.java | 247 ++++
pom.xml | 7 +
28 files changed, 1765 insertions(+), 619 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/bin/flume-ng
----------------------------------------------------------------------
diff --git a/bin/flume-ng b/bin/flume-ng
index 22b95b8..65cc985 100755
--- a/bin/flume-ng
+++ b/bin/flume-ng
@@ -26,6 +26,7 @@
FLUME_AGENT_CLASS="org.apache.flume.node.Application"
FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient"
FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo"
+FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain"
CLEAN_FLAG=1
################################
@@ -261,6 +262,9 @@ case "$mode" in
avro-client)
opt_avro_client=1
;;
+ tool)
+ opt_tool=1
+ ;;
version)
opt_version=1
CLEAN_FLAG=0
@@ -433,6 +437,8 @@ elif [ -n "$opt_avro_client" ] ; then
run_flume $FLUME_AVRO_CLIENT_CLASS $args
elif [ -n "${opt_version}" ] ; then
run_flume $FLUME_VERSION_CLASS $args
+elif [ -n "${opt_tool}" ] ; then
+ run_flume $FLUME_TOOLS_CLASS $args
else
error "This message should never appear" 1
fi
http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java
new file mode 100644
index 0000000..691d291
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+
+public class CorruptEventException extends Exception {
+
+ private static final long serialVersionUID = -2986946303540798416L;
+ public CorruptEventException() {
+ super();
+ }
+
+ public CorruptEventException(String msg) {
+ super(msg);
+ }
+
+ public CorruptEventException(String msg, Throwable th) {
+ super(msg, th);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index cc0d38a..36f150b 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -100,6 +100,7 @@ public class FileChannel extends BasicChannelSemantics {
private String encryptionActiveKey;
private String encryptionCipherProvider;
private boolean useDualCheckpoints;
+ private boolean isTest = false;
@Override
public synchronized void setName(String name) {
@@ -451,6 +452,7 @@ public class FileChannel extends BasicChannelSemantics {
private String getStateAsString() {
return String.valueOf(getState());
}
+
@Override
protected void doPut(Event event) throws InterruptedException {
channelCounter.incrementEventPutAttemptCount();
@@ -511,23 +513,40 @@ public class FileChannel extends BasicChannelSemantics {
+ "log. Try increasing the log write timeout value. " +
channelNameDescriptor);
}
+
+ /*
+ * 1. Take an event which is in the queue.
+ * 2. If getting that event does not throw NoopRecordException,
+ * then return it.
+ * 3. Else try to retrieve the next event from the queue
+ * 4. Repeat 2 and 3 until queue is empty or an event is returned.
+ */
+
try {
- FlumeEventPointer ptr = queue.removeHead(transactionID);
- if(ptr != null) {
- try {
- // first add to takeList so that if write to disk
- // fails rollback actually does it's work
- Preconditions.checkState(takeList.offer(ptr), "takeList offer failed "
- + channelNameDescriptor);
- log.take(transactionID, ptr); // write take to disk
- Event event = log.get(ptr);
- return event;
- } catch (IOException e) {
- throw new ChannelException("Take failed due to IO error "
- + channelNameDescriptor, e);
+ while (true) {
+ FlumeEventPointer ptr = queue.removeHead(transactionID);
+ if (ptr == null) {
+ return null;
+ } else {
+ try {
+ // first add to takeList so that if write to disk
+ // fails rollback actually does it's work
+ Preconditions.checkState(takeList.offer(ptr),
+ "takeList offer failed "
+ + channelNameDescriptor);
+ log.take(transactionID, ptr); // write take to disk
+ Event event = log.get(ptr);
+ return event;
+ } catch (IOException e) {
+ throw new ChannelException("Take failed due to IO error "
+ + channelNameDescriptor, e);
+ } catch (NoopRecordException e) {
+ LOG.warn("Corrupt record replaced by File Channel Integrity " +
+ "tool found. Will retrieve next event", e);
+ takeList.remove(ptr);
+ }
}
}
- return null;
} finally {
log.unlockShared();
}
http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 1918baa..8dc0ff8 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -73,7 +73,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-class Log {
+public class Log {
public static final String PREFIX = "log-";
private static final Logger LOGGER = LoggerFactory.getLogger(Log.class);
private static final int MIN_NUM_LOGS = 2;
@@ -571,12 +571,18 @@ class Log {
* @throws InterruptedException
*/
FlumeEvent get(FlumeEventPointer pointer) throws IOException,
- InterruptedException {
+ InterruptedException, NoopRecordException {
Preconditions.checkState(open, "Log is closed");
int id = pointer.getFileID();
LogFile.RandomReader logFile = idLogFileMap.get(id);
Preconditions.checkNotNull(logFile, "LogFile is null for id " + id);
- return logFile.get(pointer.getOffset());
+ try {
+ return logFile.get(pointer.getOffset());
+ } catch (CorruptEventException ex) {
+ open = false;
+ throw new IOException("Corrupt event found. Please run File Channel " +
+ "Integrity tool.", ex);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
index d3db896..bb8ce1a 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
@@ -22,6 +22,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import org.apache.flume.ChannelException;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.channel.file.encryption.CipherProvider;
import org.apache.flume.channel.file.encryption.KeyProvider;
import org.apache.flume.tools.DirectMemoryUtils;
@@ -31,6 +34,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.EOFException;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
@@ -40,7 +44,9 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
-abstract class LogFile {
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class LogFile {
private static final Logger LOG = LoggerFactory
.getLogger(LogFile.class);
@@ -54,8 +60,9 @@ abstract class LogFile {
private static final ByteBuffer FILL = DirectMemoryUtils.
allocate(1024 * 1024); // preallocation, 1MB
- protected static final byte OP_RECORD = Byte.MAX_VALUE;
- protected static final byte OP_EOF = Byte.MIN_VALUE;
+ public static final byte OP_RECORD = Byte.MAX_VALUE;
+ public static final byte OP_NOOP = (Byte.MAX_VALUE + Byte.MIN_VALUE)/2;
+ public static final byte OP_EOF = Byte.MIN_VALUE;
static {
for (int i = 0; i < FILL.capacity(); i++) {
@@ -63,6 +70,13 @@ abstract class LogFile {
}
}
+ protected static void skipRecord(RandomAccessFile fileHandle,
+ int offset) throws IOException {
+ fileHandle.seek(offset);
+ int length = fileHandle.readInt();
+ fileHandle.skipBytes(length);
+ }
+
abstract static class MetaDataWriter {
private final File file;
private final int logFileID;
@@ -296,6 +310,48 @@ abstract class LogFile {
}
}
+ /**
+ * This is an class meant to be an internal Flume API,
+ * and can change at any time. Intended to be used only from File Channel Integrity
+ * test tool. Not to be used for any other purpose.
+ */
+ public static class OperationRecordUpdater {
+ private final RandomAccessFile fileHandle;
+ private final File file;
+
+ public OperationRecordUpdater(File file) throws FileNotFoundException {
+ Preconditions.checkState(file.exists(), "File to update, " +
+ file.toString() + " does not exist.");
+ this.file = file;
+ fileHandle = new RandomAccessFile(file, "rw");
+ }
+
+ public void markRecordAsNoop(long offset) throws IOException {
+ // First ensure that the offset actually is an OP_RECORD. There is a
+ // small possibility that it still is OP_RECORD,
+ // but is not actually the beginning of a record. Is there anything we
+ // can do about it?
+ fileHandle.seek(offset);
+ byte byteRead = fileHandle.readByte();
+ Preconditions.checkState(byteRead == OP_RECORD || byteRead == OP_NOOP,
+ "Expected to read a record but the byte read indicates EOF");
+ fileHandle.seek(offset);
+ LOG.info("Marking event as " + OP_NOOP + " at " + offset + " for file " +
+ file.toString());
+ fileHandle.writeByte(OP_NOOP);
+ }
+
+ public void close() {
+ try {
+ fileHandle.getFD().sync();
+ fileHandle.close();
+ } catch (IOException e) {
+ LOG.error("Could not close file handle to file " +
+ fileHandle.toString(), e);
+ }
+ }
+ }
+
static abstract class RandomReader {
private final File file;
private final BlockingQueue<RandomAccessFile> readFileHandles =
@@ -311,7 +367,7 @@ abstract class LogFile {
}
protected abstract TransactionEventRecord doGet(RandomAccessFile fileHandle)
- throws IOException;
+ throws IOException, CorruptEventException;
abstract int getVersion();
@@ -323,13 +379,18 @@ abstract class LogFile {
return encryptionKeyProvider;
}
- FlumeEvent get(int offset) throws IOException, InterruptedException {
+ FlumeEvent get(int offset) throws IOException, InterruptedException,
+ CorruptEventException, NoopRecordException {
Preconditions.checkState(open, "File closed");
RandomAccessFile fileHandle = checkOut();
boolean error = true;
try {
fileHandle.seek(offset);
byte operation = fileHandle.readByte();
+ if(operation == OP_NOOP) {
+ throw new NoopRecordException("No op record found. Corrupt record " +
+ "may have been repaired by File Channel Integrity tool");
+ }
Preconditions.checkState(operation == OP_RECORD,
Integer.toHexString(operation));
TransactionEventRecord record = doGet(fileHandle);
@@ -408,7 +469,7 @@ abstract class LogFile {
}
}
- static abstract class SequentialReader {
+ public static abstract class SequentialReader {
private final RandomAccessFile fileHandle;
private final FileChannel fileChannel;
@@ -434,7 +495,7 @@ abstract class LogFile {
fileHandle = new RandomAccessFile(file, "r");
fileChannel = fileHandle.getChannel();
}
- abstract LogRecord doNext(int offset) throws IOException;
+ abstract LogRecord doNext(int offset) throws IOException, CorruptEventException;
abstract int getVersion();
@@ -488,7 +549,7 @@ abstract class LogFile {
}
}
- LogRecord next() throws IOException {
+ public LogRecord next() throws IOException, CorruptEventException {
int offset = -1;
try {
long position = fileChannel.position();
@@ -499,14 +560,26 @@ abstract class LogFile {
}
offset = (int) position;
Preconditions.checkState(offset >= 0);
- byte operation = fileHandle.readByte();
- if(operation != OP_RECORD) {
- if(operation == OP_EOF) {
+ while (offset < fileHandle.length()) {
+ byte operation = fileHandle.readByte();
+ if (operation == OP_RECORD) {
+ break;
+ } else if (operation == OP_EOF) {
LOG.info("Encountered EOF at " + offset + " in " + file);
+ return null;
+ } else if (operation == OP_NOOP) {
+ LOG.info("No op event found in file: " + file.toString() +
+ " at " + offset + ". Skipping event.");
+ skipRecord(fileHandle, offset + 1);
+ offset = (int) fileHandle.getFilePointer();
+ continue;
} else {
LOG.error("Encountered non op-record at " + offset + " " +
- Integer.toHexString(operation) + " in " + file);
+ Integer.toHexString(operation) + " in " + file);
+ return null;
}
+ }
+ if(offset >= fileHandle.length()) {
return null;
}
return doNext(offset);
@@ -518,7 +591,10 @@ abstract class LogFile {
}
}
- void close() {
+ public long getPosition() throws IOException {
+ return fileChannel.position();
+ }
+ public void close() {
if(fileHandle != null) {
try {
fileHandle.close();
@@ -540,7 +616,7 @@ abstract class LogFile {
return buffer;
}
- public static void main(String[] args) throws EOFException, IOException {
+ public static void main(String[] args) throws EOFException, IOException, CorruptEventException {
File file = new File(args[0]);
LogFile.SequentialReader reader = null;
try {
http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
index d9a2a9b..38f6ecb 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
@@ -21,6 +21,8 @@ package org.apache.flume.channel.file;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.GeneratedMessage;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.channel.file.encryption.CipherProvider;
import org.apache.flume.channel.file.encryption.CipherProviderFactory;
import org.apache.flume.channel.file.encryption.KeyProvider;
@@ -43,7 +45,9 @@ import java.util.concurrent.LinkedBlockingDeque;
* Represents a single data file on disk. Has methods to write,
* read sequentially (replay), and read randomly (channel takes).
*/
-class LogFileV3 extends LogFile {
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class LogFileV3 extends LogFile {
protected static final Logger LOGGER =
LoggerFactory.getLogger(LogFileV3.class);
@@ -267,7 +271,7 @@ class LogFileV3 extends LogFile {
}
@Override
protected TransactionEventRecord doGet(RandomAccessFile fileHandle)
- throws IOException {
+ throws IOException, CorruptEventException {
// readers are opened right when the file is created and thus
// empty. As such we wait to initialize until there is some
// data before we we initialize
@@ -297,10 +301,11 @@ class LogFileV3 extends LogFile {
}
}
- static class SequentialReader extends LogFile.SequentialReader {
+ public static class SequentialReader extends LogFile.SequentialReader {
private CipherProvider.Decryptor decryptor;
- SequentialReader(File file, @Nullable KeyProvider encryptionKeyProvider)
- throws EOFException, IOException {
+
+ public SequentialReader(File file, @Nullable KeyProvider
+ encryptionKeyProvider) throws EOFException, IOException {
super(file, encryptionKeyProvider);
File metaDataFile = Serialization.getMetaDataFile(file);
FileInputStream inputStream = new FileInputStream(metaDataFile);
@@ -344,8 +349,9 @@ class LogFileV3 extends LogFile {
public int getVersion() {
return Serialization.VERSION_3;
}
+
@Override
- LogRecord doNext(int offset) throws IOException {
+ LogRecord doNext(int offset) throws IOException, CorruptEventException {
byte[] buffer = readDelimitedBuffer(getFileHandle());
if(decryptor != null) {
buffer = decryptor.decrypt(buffer);
http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/NoopRecordException.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/NoopRecordException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/NoopRecordException.java
new file mode 100644
index 0000000..5f446b8
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/NoopRecordException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+public class NoopRecordException extends Exception {
+ private static final long serialVersionUID = -7394180633208889738L;
+
+ public NoopRecordException() {
+ super();
+ }
+
+ public NoopRecordException(String msg) {
+ super(msg);
+ }
+
+ public NoopRecordException(String msg, Throwable th) {
+ super(msg, th);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
index 4235a79..f08f024 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
@@ -24,7 +24,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.flume.channel.file.proto.ProtosFactory;
import com.google.common.base.Preconditions;
@@ -36,13 +39,19 @@ import com.google.protobuf.ByteString;
*/
class Put extends TransactionEventRecord {
private FlumeEvent event;
+ // Should we move this to a higher level to not make multiple instances?
+ // Doing that might cause performance issues, since access to this would
+ // need to be synchronized (the whole reset-update-getValue cycle would
+ // need to be).
+ private final Checksum checksum = new CRC32();
+ @VisibleForTesting
Put(Long transactionID, Long logWriteOrderID) {
- super(transactionID, logWriteOrderID);
+ this(transactionID, logWriteOrderID, null);
}
Put(Long transactionID, Long logWriteOrderID, FlumeEvent event) {
- this(transactionID, logWriteOrderID);
+ super(transactionID, logWriteOrderID);
this.event = event;
}
@@ -78,11 +87,14 @@ class Put extends TransactionEventRecord {
}
}
eventBuilder.setBody(ByteString.copyFrom(event.getBody()));
- putBuilder.setEvent(eventBuilder.build());
+ ProtosFactory.FlumeEvent protoEvent = eventBuilder.build();
+ putBuilder.setEvent(protoEvent);
+ putBuilder.setChecksum(calculateChecksum(event.getBody()));
putBuilder.build().writeDelimitedTo(out);
}
@Override
- void readProtos(InputStream in) throws IOException {
+ void readProtos(InputStream in) throws IOException,
+ CorruptEventException {
ProtosFactory.Put put = Preconditions.checkNotNull(ProtosFactory.
Put.parseDelimitedFrom(in), "Put cannot be null");
Map<String, String> headers = Maps.newHashMap();
@@ -90,9 +102,25 @@ class Put extends TransactionEventRecord {
for(ProtosFactory.FlumeEventHeader header : protosEvent.getHeadersList()) {
headers.put(header.getKey(), header.getValue());
}
+ byte[] eventBody = protosEvent.getBody().toByteArray();
+
+ if (put.hasChecksum()) {
+ long eventBodyChecksum = calculateChecksum(eventBody);
+ if (eventBodyChecksum != put.getChecksum()) {
+ throw new CorruptEventException("Expected checksum for event was " +
+ eventBodyChecksum + " but the checksum of the event is " + put.getChecksum());
+ }
+ }
// TODO when we remove v2, remove FlumeEvent and use EventBuilder here
- event = new FlumeEvent(headers, protosEvent.getBody().toByteArray());
+ event = new FlumeEvent(headers, eventBody);
+ }
+
+ protected long calculateChecksum(byte[] body) {
+ checksum.reset();
+ checksum.update(body, 0, body.length);
+ return checksum.getValue();
}
+
@Override
public short getRecordType() {
return Type.PUT.get();
http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
index fc47b23..c8f5fdd 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
@@ -26,6 +26,7 @@ import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import org.apache.commons.collections.MultiMap;
import org.apache.commons.collections.map.MultiValueMap;
+import org.apache.flume.ChannelException;
import org.apache.flume.channel.file.encryption.KeyProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -373,7 +374,7 @@ class ReplayHandler {
}
}
}
- private LogRecord next() throws IOException {
+ private LogRecord next() throws IOException, CorruptEventException {
LogRecord resultLogRecord = logRecordBuffer.poll();
if(resultLogRecord != null) {
// there is more log records to read
http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
index d6897e1..f8160d9 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
@@ -21,6 +21,8 @@ package org.apache.flume.channel.file;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.apache.commons.io.FileUtils;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,7 +35,9 @@ import java.io.RandomAccessFile;
import java.util.Collections;
import java.util.Set;
-class Serialization {
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class Serialization {
private Serialization() {}
static final long SIZE_OF_INT = 4;
@@ -43,9 +47,10 @@ class Serialization {
static final int VERSION_2 = 2;
static final int VERSION_3 = 3;
- static final String METADATA_FILENAME = ".meta";
- static final String METADATA_TMP_FILENAME = ".tmp";
- static final String OLD_METADATA_FILENAME = METADATA_FILENAME + ".old";
+ public static final String METADATA_FILENAME = ".meta";
+ public static final String METADATA_TMP_FILENAME = ".tmp";
+ public static final String OLD_METADATA_FILENAME = METADATA_FILENAME +
+ ".old";
// 64 K buffer to copy files.
private static final int FILE_COPY_BUFFER_SIZE = 64 * 1024;
@@ -121,7 +126,7 @@ class Serialization {
* @param to Destination file - this file should not exist
* @return true if the copy was successful
*/
- static boolean copyFile(File from, File to) throws IOException {
+ public static boolean copyFile(File from, File to) throws IOException {
Preconditions.checkNotNull(from, "Source file is null, file copy failed.");
Preconditions.checkNotNull(to, "Destination file is null, " +
"file copy failed.");
http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
index 073042f..dda9b3f 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
@@ -29,6 +29,8 @@ import java.io.OutputStream;
import java.lang.reflect.Constructor;
import java.nio.ByteBuffer;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.channel.file.proto.ProtosFactory;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
@@ -41,7 +43,9 @@ import com.google.common.collect.ImmutableMap;
/**
* Base class for records in data file: Put, Take, Rollback, Commit
*/
-abstract class TransactionEventRecord implements Writable {
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class TransactionEventRecord implements Writable {
private static final Logger LOG = LoggerFactory
.getLogger(TransactionEventRecord.class);
private final long transactionID;
@@ -63,7 +67,7 @@ abstract class TransactionEventRecord implements Writable {
abstract void writeProtos(OutputStream out) throws IOException;
- abstract void readProtos(InputStream in) throws IOException;
+ abstract void readProtos(InputStream in) throws IOException, CorruptEventException;
long getLogWriteOrderID() {
return logWriteOrderID;
@@ -187,7 +191,7 @@ abstract class TransactionEventRecord implements Writable {
static TransactionEventRecord fromByteArray(byte[] buffer)
- throws IOException {
+ throws IOException, CorruptEventException {
ByteArrayInputStream in = new ByteArrayInputStream(buffer);
try {
ProtosFactory.TransactionEventHeader header = Preconditions.