You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ro...@apache.org on 2015/12/17 19:42:09 UTC
flume git commit: FLUME-2801. Performance improvement on TailDir
source
Repository: flume
Updated Branches:
refs/heads/trunk 88b3fee10 -> 0421fa2ab
FLUME-2801. Performance improvement on TailDir source
(Jun Seok Hong via Satoshi Iijima and Roshan Naik)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/0421fa2a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/0421fa2a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/0421fa2a
Branch: refs/heads/trunk
Commit: 0421fa2ab1eb9575b34bbb2f44e8c6d83842eaeb
Parents: 88b3fee
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Thu Dec 17 10:35:56 2015 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Thu Dec 17 10:39:49 2015 -0800
----------------------------------------------------------------------
.../taildir/ReliableTaildirEventReader.java | 4 +-
.../apache/flume/source/taildir/TailFile.java | 129 ++++++++++++++-----
2 files changed, 101 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/0421fa2a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
index 951b786..5b6d465 100644
--- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
+++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
@@ -195,7 +195,7 @@ public class ReliableTaildirEventReader implements ReliableEventReader {
}
logger.info("Last read was never committed - resetting position");
long lastPos = currentFile.getPos();
- currentFile.getRaf().seek(lastPos);
+ currentFile.updateFilePos(lastPos);
}
List<Event> events = currentFile.readEvents(numEvents, backoffWithoutNL, addByteOffset);
if (events.isEmpty()) {
@@ -223,7 +223,7 @@ public class ReliableTaildirEventReader implements ReliableEventReader {
@Override
public void commit() throws IOException {
if (!committed && currentFile != null) {
- long pos = currentFile.getRaf().getFilePointer();
+ long pos = currentFile.getLineReadPos();
currentFile.setPos(pos);
currentFile.setLastUpdated(updateTime);
committed = true;
http://git-wip-us.apache.org/repos/asf/flume/blob/0421fa2a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
index 99683da..eabd357 100644
--- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
+++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
@@ -28,22 +28,21 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang.StringUtils;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
public class TailFile {
private static final Logger logger = LoggerFactory.getLogger(TailFile.class);
- private static final String LINE_SEP = "\n";
- private static final String LINE_SEP_WIN = "\r\n";
+ private static final byte BYTE_NL = (byte) 10;
+ private static final byte BYTE_CR = (byte) 13;
+
+ private final static int BUFFER_SIZE = 8192;
+ private final static int NEED_READING = -1;
private RandomAccessFile raf;
private final String path;
@@ -52,17 +51,26 @@ public class TailFile {
private long lastUpdated;
private boolean needTail;
private final Map<String, String> headers;
+ private byte[] buffer;
+ private byte[] oldBuffer;
+ private int bufferPos;
+ private long lineReadPos;
public TailFile(File file, Map<String, String> headers, long inode, long pos)
throws IOException {
this.raf = new RandomAccessFile(file, "r");
- if (pos > 0) raf.seek(pos);
+ if (pos > 0) {
+ raf.seek(pos);
+ lineReadPos=pos;
+ }
this.path = file.getAbsolutePath();
this.inode = inode;
this.pos = pos;
this.lastUpdated = 0L;
this.needTail = true;
this.headers = headers;
+ this.oldBuffer = new byte[0];
+ this.bufferPos= NEED_READING;
}
public RandomAccessFile getRaf() { return raf; }
@@ -72,20 +80,29 @@ public class TailFile {
public long getLastUpdated() { return lastUpdated; }
public boolean needTail() { return needTail; }
public Map<String, String> getHeaders() { return headers; }
+ public long getLineReadPos() { return lineReadPos; }
public void setPos(long pos) { this.pos = pos; }
public void setLastUpdated(long lastUpdated) { this.lastUpdated = lastUpdated; }
public void setNeedTail(boolean needTail) { this.needTail = needTail; }
+ public void setLineReadPos(long lineReadPos) { this.lineReadPos = lineReadPos; }
public boolean updatePos(String path, long inode, long pos) throws IOException {
if (this.inode == inode && this.path.equals(path)) {
- raf.seek(pos);
setPos(pos);
+ updateFilePos(pos);
logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos);
return true;
}
return false;
}
+ public void updateFilePos(long pos) throws IOException {
+ raf.seek(pos);
+ lineReadPos = pos;
+ bufferPos= NEED_READING;
+ oldBuffer = new byte[0];
+ }
+
public List<Event> readEvents(int numEvents, boolean backoffWithoutNL,
boolean addByteOffset) throws IOException {
@@ -101,44 +118,87 @@ public class TailFile {
}
private Event readEvent(boolean backoffWithoutNL, boolean addByteOffset) throws IOException {
- Long posTmp = raf.getFilePointer();
- String line = readLine();
+ Long posTmp = getLineReadPos();
+ LineResult line = readLine();
if (line == null) {
return null;
}
- if (backoffWithoutNL && !line.endsWith(LINE_SEP)) {
+ if (backoffWithoutNL && !line.lineSepInclude) {
logger.info("Backing off in file without newline: "
+ path + ", inode: " + inode + ", pos: " + raf.getFilePointer());
- raf.seek(posTmp);
+ updateFilePos(posTmp);
return null;
}
-
- String lineSep = LINE_SEP;
- if(line.endsWith(LINE_SEP_WIN)) {
- lineSep = LINE_SEP_WIN;
- }
- Event event = EventBuilder.withBody(StringUtils.removeEnd(line, lineSep), Charsets.UTF_8);
+ Event event = EventBuilder.withBody(line.line);
if (addByteOffset == true) {
event.getHeaders().put(BYTE_OFFSET_HEADER_KEY, posTmp.toString());
}
return event;
}
- private String readLine() throws IOException {
- ByteArrayDataOutput out = ByteStreams.newDataOutput(300);
- int i = 0;
- int c;
- while ((c = raf.read()) != -1) {
- i++;
- out.write((byte) c);
- if (c == LINE_SEP.charAt(0)) {
+ private void readFile() throws IOException {
+ if ((raf.length() - raf.getFilePointer()) < BUFFER_SIZE) {
+ buffer = new byte[(int) (raf.length() - raf.getFilePointer())];
+ } else {
+ buffer = new byte[BUFFER_SIZE];
+ }
+ raf.read(buffer, 0, buffer.length);
+ bufferPos = 0;
+ }
+
+ private byte[] concatByteArrays(byte[] a, int startIdxA, int lenA, byte[] b, int startIdxB, int lenB) {
+ byte[] c = new byte[lenA + lenB];
+ System.arraycopy(a, startIdxA, c, 0, lenA);
+ System.arraycopy(b, startIdxB, c, lenA, lenB);
+ return c;
+ }
+
+ public LineResult readLine() throws IOException {
+ LineResult lineResult = null;
+ while (true) {
+ if (bufferPos == NEED_READING) {
+ if (raf.getFilePointer() < raf.length()) {
+ readFile();
+ } else {
+ if (oldBuffer.length > 0) {
+ lineResult = new LineResult(false, oldBuffer);
+ oldBuffer = new byte[0];
+ setLineReadPos(lineReadPos + lineResult.line.length);
+ }
+ break;
+ }
+ }
+ for (int i = bufferPos; i < buffer.length; i++) {
+ if (buffer[i] == BYTE_NL) {
+ int oldLen = oldBuffer.length;
+ // Don't copy last byte(NEW_LINE)
+ int lineLen = i - bufferPos;
+ // For windows, check for CR
+ if (i > 0 && buffer[i - 1] == BYTE_CR) {
+ lineLen -= 1;
+ } else if (oldBuffer.length > 0 && oldBuffer[oldBuffer.length - 1] == BYTE_CR) {
+ oldLen -= 1;
+ }
+ lineResult = new LineResult(true,
+ concatByteArrays(oldBuffer, 0, oldLen, buffer, bufferPos, lineLen));
+ setLineReadPos(lineReadPos + (oldBuffer.length + (i - bufferPos + 1)));
+ oldBuffer = new byte[0];
+ if (i + 1 < buffer.length) {
+ bufferPos = i + 1;
+ } else {
+ bufferPos = NEED_READING;
+ }
+ break;
+ }
+ }
+ if (lineResult != null) {
break;
}
+ // NEW_LINE not showed up at the end of the buffer
+ oldBuffer = concatByteArrays(oldBuffer, 0, oldBuffer.length, buffer, bufferPos, (buffer.length - bufferPos));
+ bufferPos = NEED_READING;
}
- if (i == 0) {
- return null;
- }
- return new String(out.toByteArray(), Charsets.UTF_8);
+ return lineResult;
}
public void close() {
@@ -159,5 +219,14 @@ public class TailFile {
}
}
+ private class LineResult {
+ final boolean lineSepInclude;
+ final byte[] line;
+ public LineResult(boolean lineSepInclude, byte[] line) {
+ super();
+ this.lineSepInclude = lineSepInclude;
+ this.line = line;
+ }
+ }
}