You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ro...@apache.org on 2015/12/17 19:42:09 UTC

flume git commit: FLUME-2801. Performance improvement on TailDir source

Repository: flume
Updated Branches:
  refs/heads/trunk 88b3fee10 -> 0421fa2ab


FLUME-2801. Performance improvement on TailDir source

(Jun Seok Hong via Satoshi Iijima and Roshan Naik)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/0421fa2a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/0421fa2a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/0421fa2a

Branch: refs/heads/trunk
Commit: 0421fa2ab1eb9575b34bbb2f44e8c6d83842eaeb
Parents: 88b3fee
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Thu Dec 17 10:35:56 2015 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Thu Dec 17 10:39:49 2015 -0800

----------------------------------------------------------------------
 .../taildir/ReliableTaildirEventReader.java     |   4 +-
 .../apache/flume/source/taildir/TailFile.java   | 129 ++++++++++++++-----
 2 files changed, 101 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/0421fa2a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
index 951b786..5b6d465 100644
--- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
+++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
@@ -195,7 +195,7 @@ public class ReliableTaildirEventReader implements ReliableEventReader {
       }
       logger.info("Last read was never committed - resetting position");
       long lastPos = currentFile.getPos();
-      currentFile.getRaf().seek(lastPos);
+      currentFile.updateFilePos(lastPos);
     }
     List<Event> events = currentFile.readEvents(numEvents, backoffWithoutNL, addByteOffset);
     if (events.isEmpty()) {
@@ -223,7 +223,7 @@ public class ReliableTaildirEventReader implements ReliableEventReader {
   @Override
   public void commit() throws IOException {
     if (!committed && currentFile != null) {
-      long pos = currentFile.getRaf().getFilePointer();
+      long pos = currentFile.getLineReadPos();
       currentFile.setPos(pos);
       currentFile.setLastUpdated(updateTime);
       committed = true;

http://git-wip-us.apache.org/repos/asf/flume/blob/0421fa2a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
index 99683da..eabd357 100644
--- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
+++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
@@ -28,22 +28,21 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
 
 public class TailFile {
   private static final Logger logger = LoggerFactory.getLogger(TailFile.class);
 
-  private static final String LINE_SEP = "\n";
-  private static final String LINE_SEP_WIN = "\r\n";
+  private static final byte BYTE_NL = (byte) 10;
+  private static final byte BYTE_CR = (byte) 13;
+
+  private final static int BUFFER_SIZE = 8192;
+  private final static int NEED_READING = -1;
 
   private RandomAccessFile raf;
   private final String path;
@@ -52,17 +51,26 @@ public class TailFile {
   private long lastUpdated;
   private boolean needTail;
   private final Map<String, String> headers;
+  private byte[] buffer;
+  private byte[] oldBuffer;
+  private int bufferPos;
+  private long lineReadPos;
 
   public TailFile(File file, Map<String, String> headers, long inode, long pos)
       throws IOException {
     this.raf = new RandomAccessFile(file, "r");
-    if (pos > 0) raf.seek(pos);
+    if (pos > 0) {
+      raf.seek(pos);
+      lineReadPos=pos;
+    }
     this.path = file.getAbsolutePath();
     this.inode = inode;
     this.pos = pos;
     this.lastUpdated = 0L;
     this.needTail = true;
     this.headers = headers;
+    this.oldBuffer = new byte[0];
+    this.bufferPos= NEED_READING;
   }
 
   public RandomAccessFile getRaf() { return raf; }
@@ -72,20 +80,29 @@ public class TailFile {
   public long getLastUpdated() { return lastUpdated; }
   public boolean needTail() { return needTail; }
   public Map<String, String> getHeaders() { return headers; }
+  public long getLineReadPos() { return lineReadPos; }
 
   public void setPos(long pos) { this.pos = pos; }
   public void setLastUpdated(long lastUpdated) { this.lastUpdated = lastUpdated; }
   public void setNeedTail(boolean needTail) { this.needTail = needTail; }
+  public void setLineReadPos(long lineReadPos) { this.lineReadPos = lineReadPos; }
 
   public boolean updatePos(String path, long inode, long pos) throws IOException {
     if (this.inode == inode && this.path.equals(path)) {
-      raf.seek(pos);
       setPos(pos);
+      updateFilePos(pos);
       logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos);
       return true;
     }
     return false;
   }
+  public void updateFilePos(long pos) throws IOException {
+    raf.seek(pos);
+    lineReadPos = pos;
+    bufferPos= NEED_READING;
+    oldBuffer = new byte[0];
+  }
+
 
   public List<Event> readEvents(int numEvents, boolean backoffWithoutNL,
       boolean addByteOffset) throws IOException {
@@ -101,44 +118,87 @@ public class TailFile {
   }
 
   private Event readEvent(boolean backoffWithoutNL, boolean addByteOffset) throws IOException {
-    Long posTmp = raf.getFilePointer();
-    String line = readLine();
+    Long posTmp = getLineReadPos();
+    LineResult line = readLine();
     if (line == null) {
       return null;
     }
-    if (backoffWithoutNL && !line.endsWith(LINE_SEP)) {
+    if (backoffWithoutNL && !line.lineSepInclude) {
       logger.info("Backing off in file without newline: "
           + path + ", inode: " + inode + ", pos: " + raf.getFilePointer());
-      raf.seek(posTmp);
+      updateFilePos(posTmp);
       return null;
     }
-
-    String lineSep = LINE_SEP;
-    if(line.endsWith(LINE_SEP_WIN)) {
-      lineSep = LINE_SEP_WIN;
-    }
-    Event event = EventBuilder.withBody(StringUtils.removeEnd(line, lineSep), Charsets.UTF_8);
+    Event event = EventBuilder.withBody(line.line);
     if (addByteOffset == true) {
       event.getHeaders().put(BYTE_OFFSET_HEADER_KEY, posTmp.toString());
     }
     return event;
   }
 
-  private String readLine() throws IOException {
-    ByteArrayDataOutput out = ByteStreams.newDataOutput(300);
-    int i = 0;
-    int c;
-    while ((c = raf.read()) != -1) {
-      i++;
-      out.write((byte) c);
-      if (c == LINE_SEP.charAt(0)) {
+  private void readFile() throws IOException {
+    if ((raf.length() - raf.getFilePointer()) < BUFFER_SIZE) {
+      buffer = new byte[(int) (raf.length() - raf.getFilePointer())];
+    } else {
+      buffer = new byte[BUFFER_SIZE];
+    }
+    raf.read(buffer, 0, buffer.length);
+    bufferPos = 0;
+  }
+
+  private byte[] concatByteArrays(byte[] a, int startIdxA, int lenA, byte[] b, int startIdxB, int lenB) {
+    byte[] c = new byte[lenA + lenB];
+    System.arraycopy(a, startIdxA, c, 0, lenA);
+    System.arraycopy(b, startIdxB, c, lenA, lenB);
+    return c;
+  }
+
+  public LineResult readLine() throws IOException {
+    LineResult lineResult = null;
+    while (true) {
+      if (bufferPos == NEED_READING) {
+        if (raf.getFilePointer() < raf.length()) {
+          readFile();
+        } else {
+          if (oldBuffer.length > 0) {
+            lineResult = new LineResult(false, oldBuffer);
+            oldBuffer = new byte[0];
+            setLineReadPos(lineReadPos + lineResult.line.length);
+          }
+          break;
+        }
+      }
+      for (int i = bufferPos; i < buffer.length; i++) {
+        if (buffer[i] == BYTE_NL) {
+          int oldLen = oldBuffer.length;
+          // Don't copy last byte(NEW_LINE)
+          int lineLen = i - bufferPos;
+          // For windows, check for CR
+          if (i > 0 && buffer[i - 1] == BYTE_CR) {
+            lineLen -= 1;
+          } else if (oldBuffer.length > 0 && oldBuffer[oldBuffer.length - 1] == BYTE_CR) {
+            oldLen -= 1;
+          }
+          lineResult = new LineResult(true,
+              concatByteArrays(oldBuffer, 0, oldLen, buffer, bufferPos, lineLen));
+          setLineReadPos(lineReadPos + (oldBuffer.length + (i - bufferPos + 1)));
+          oldBuffer = new byte[0];
+          if (i + 1 < buffer.length) {
+            bufferPos = i + 1;
+          } else {
+            bufferPos = NEED_READING;
+          }
+          break;
+        }
+      }
+      if (lineResult != null) {
         break;
       }
+      // NEW_LINE not showed up at the end of the buffer
+      oldBuffer = concatByteArrays(oldBuffer, 0, oldBuffer.length, buffer, bufferPos, (buffer.length - bufferPos));
+      bufferPos = NEED_READING;
     }
-    if (i == 0) {
-      return null;
-    }
-    return new String(out.toByteArray(), Charsets.UTF_8);
+    return lineResult;
   }
 
   public void close() {
@@ -159,5 +219,14 @@ public class TailFile {
     }
   }
 
+  private class LineResult {
+    final boolean lineSepInclude;
+    final byte[] line;
 
+    public LineResult(boolean lineSepInclude, byte[] line) {
+      super();
+      this.lineSepInclude = lineSepInclude;
+      this.line = line;
+    }
+  }
 }