You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "syntony liu (JIRA)" <ji...@apache.org> on 2013/09/05 16:20:51 UTC
[jira] [Comment Edited] (FLUME-2182) Spooling Directory Source
can't ingest data completely, when a file contain some wide character, such
as chinese character.
[ https://issues.apache.org/jira/browse/FLUME-2182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13759102#comment-13759102 ]
syntony liu edited comment on FLUME-2182 at 9/5/13 2:20 PM:
------------------------------------------------------------
in addition, I modify decoder . it can't throw Exception.
public ResettableFileInputStream(File file, PositionTracker tracker,
int bufSize, Charset charset)
throws IOException {
......
this.decoder = charset.newDecoder();
this.decoder.implOnMalformedInput(CodingErrorAction.REPLACE); // add by me
this.decoder.implOnUnmappableCharacter(CodingErrorAction.REPLACE); // add by me
......
}
was (Author: syntonyliu):
in addition, I modify decoder . it can't throw Exception.
public ResettableFileInputStream(File file, PositionTracker tracker,
int bufSize, Charset charset)
throws IOException {
this.file = file;
this.tracker = tracker;
this.in = new FileInputStream(file);
this.chan = in.getChannel();
this.buf = ByteBuffer.allocateDirect(bufSize);
buf.flip();
this.byteBuf = new byte[1]; // single byte
this.charBuf = CharBuffer.allocate(1); // single char
charBuf.flip();
this.fileSize = file.length();
this.decoder = charset.newDecoder();
this.decoder.implOnMalformedInput(CodingErrorAction.REPLACE); // add by me
this.decoder.implOnUnmappableCharacter(CodingErrorAction.REPLACE); // add by me
this.position = 0;
this.syncPosition = 0;
seek(tracker.getPosition());
}
> Spooling Directory Source can't ingest data completely, when a file contain some wide character, such as chinese character.
> ---------------------------------------------------------------------------------------------------------------------------
>
> Key: FLUME-2182
> URL: https://issues.apache.org/jira/browse/FLUME-2182
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: v1.4.0
> Reporter: syntony liu
> Priority: Critical
>
> the bug is in ResettableFileInputStream.java: int readChar().
> if the last byte of buf is only a partial of a wide character, readChar() shouldn't return -1(ResettableFileInputStream.java:186). it
> loses the remanent data in a file.
> I fix it such as:
> public synchronized int readChar() throws IOException {
> // if (!buf.hasRemaining()) {
> if(buf.limit()- buf.position < 10){
> refillBuf();
> }
> int start = buf.position();
> charBuf.clear();
> boolean isEndOfInput = false;
> if (position >= fileSize) {
> isEndOfInput = true;
> }
> CoderResult res = decoder.decode(buf, charBuf, isEndOfInput);
> if (res.isMalformed() || res.isUnmappable()) {
> res.throwException();
> }
> int delta = buf.position() - start;
> charBuf.flip();
> if (charBuf.hasRemaining()) {
> char c = charBuf.get();
> // don't increment the persisted location if we are in between a
> // surrogate pair, otherwise we may never recover if we seek() to this
> // location!
> incrPosition(delta, !Character.isHighSurrogate(c));
> return c;
> // there may be a partial character in the decoder buffer
> } else {
> incrPosition(delta, false);
> return -1;
> }
> }
> it avoid a partial character, but have new issue. sometime, some lines of a log file have a repeated character.
> eg.
> original file: 123456
> sink file: 1233456
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira