You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/04 13:34:33 UTC

[GitHub] [kafka] wj1918 opened a new pull request #8612: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect

wj1918 opened a new pull request #8612:
URL: https://github.com/apache/kafka/pull/8612


   1. Trigger buffer expansion when buffer does not extract a line
   2. Limit buffer size to Integer.MAX_VALUE  
   3. Using single loop inside FileStreamSourceTask.poll 
   4. Close the stream before throw ConnectException
   5. Move stream and buffer related function to a separate class FileStreamBuffer
   Check the JIRA comment for the steps to verify.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wj1918 commented on a change in pull request #8612: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect

Posted by GitBox <gi...@apache.org>.
wj1918 commented on a change in pull request #8612:
URL: https://github.com/apache/kafka/pull/8612#discussion_r428622562



##########
File path: connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
##########
@@ -77,91 +68,34 @@ public void start(Map<String, String> props) {
 
     @Override
     public List<SourceRecord> poll() throws InterruptedException {
-        if (stream == null) {
-            try {
-                stream = Files.newInputStream(Paths.get(filename));
-                Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
-                if (offset != null) {
-                    Object lastRecordedOffset = offset.get(POSITION_FIELD);
-                    if (lastRecordedOffset != null && !(lastRecordedOffset instanceof Long))
-                        throw new ConnectException("Offset position is the incorrect type");
-                    if (lastRecordedOffset != null) {
-                        log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset);
-                        long skipLeft = (Long) lastRecordedOffset;
-                        while (skipLeft > 0) {
-                            try {
-                                long skipped = stream.skip(skipLeft);
-                                skipLeft -= skipped;
-                            } catch (IOException e) {
-                                log.error("Error while trying to seek to previous offset in file {}: ", filename, e);
-                                throw new ConnectException(e);
-                            }
-                        }
-                        log.debug("Skipped to offset {}", lastRecordedOffset);
-                    }
-                    streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L;
-                } else {
-                    streamOffset = 0L;
-                }
-                reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
-                log.debug("Opened {} for reading", logFilename());
-            } catch (NoSuchFileException e) {
-                log.warn("Couldn't find file {} for FileStreamSourceTask, sleeping to wait for it to be created", logFilename());
-                synchronized (this) {
-                    this.wait(1000);
-                }
-                return null;
-            } catch (IOException e) {
-                log.error("Error while trying to open file {}: ", filename, e);
-                throw new ConnectException(e);
-            }
-        }
+        if (!fileStreamBuffer.ensureOpen(() -> context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename))))
+            return null;
 
         // Unfortunately we can't just use readLine() because it blocks in an uninterruptible way.
         // Instead we have to manage splitting lines ourselves, using simple backoff when no new data
         // is available.
         try {
-            final BufferedReader readerCopy;

Review comment:
       remove copy of reader. poll is single threaded for this type of task. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wj1918 edited a comment on pull request #8612: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect

Posted by GitBox <gi...@apache.org>.
wj1918 edited a comment on pull request #8612:
URL: https://github.com/apache/kafka/pull/8612#issuecomment-631579687


   call for reviewers @kkonstantine @phstudy @ewencp  @hachikuji
   should have separate commit of refactoring and bug fix.
   
   Major change block of FileStreamSourceTask.java 
   
   line 66-71  : moved to FileStreamBuffer.setFilename and FileStreamBuffer.ensureOpen
   line 124-129: removed. poll is single threaded for this type of task.
   line 133-143: moved to FileStreamBuffer.extractLine
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wj1918 commented on pull request #8612: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect

Posted by GitBox <gi...@apache.org>.
wj1918 commented on pull request #8612:
URL: https://github.com/apache/kafka/pull/8612#issuecomment-631579687


   call for reviewers @kkonstantine @phstudy @ewencp  @hachikuji
   should have separate refactoring code with the bug fix code. but it is a fairly safe fix.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wj1918 commented on a change in pull request #8612: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect

Posted by GitBox <gi...@apache.org>.
wj1918 commented on a change in pull request #8612:
URL: https://github.com/apache/kafka/pull/8612#discussion_r428621073



##########
File path: connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
##########
@@ -63,12 +59,7 @@ public String version() {
     @Override
     public void start(Map<String, String> props) {
         filename = props.get(FileStreamSourceConnector.FILE_CONFIG);

Review comment:
       moved the logic from line 66-71 to FileStreamBuffer.setFilename and FileStreamBuffer.ensureOpen




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wj1918 closed pull request #8612: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect

Posted by GitBox <gi...@apache.org>.
wj1918 closed pull request #8612:
URL: https://github.com/apache/kafka/pull/8612


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wj1918 commented on a change in pull request #8612: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect

Posted by GitBox <gi...@apache.org>.
wj1918 commented on a change in pull request #8612:
URL: https://github.com/apache/kafka/pull/8612#discussion_r428618604



##########
File path: connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
##########
@@ -63,12 +59,7 @@ public String version() {
     @Override
     public void start(Map<String, String> props) {
         filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
-        if (filename == null || filename.isEmpty()) {

Review comment:
       moved to  FileStreamBuffer.setFilename and FileStreamBuffer.ensureOpen 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wj1918 edited a comment on pull request #8612: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect

Posted by GitBox <gi...@apache.org>.
wj1918 edited a comment on pull request #8612:
URL: https://github.com/apache/kafka/pull/8612#issuecomment-631579687


   call for reviewers @kkonstantine @phstudy @ewencp  @hachikuji
   should have separate commit of refactoring and bug fix. but it is a fairly safe fix.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wj1918 commented on pull request #8612: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect

Posted by GitBox <gi...@apache.org>.
wj1918 commented on pull request #8612:
URL: https://github.com/apache/kafka/pull/8612#issuecomment-632819461


   @kkonstantine thanks for the comment. understood, will close this PR and create separate PRs when ready.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #8612: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #8612:
URL: https://github.com/apache/kafka/pull/8612#issuecomment-632265994


   @wj1918 thanks for opening a PR! 
   
   I'd definitely recommend fleshing out the bugfix in a separate PR from the rest of the refactoring since the latter does not seem trivial. Additionally, w/r/t the refactoring, I'd suggest thinking whether it is essential. Keep in mind that this connector is available for demonstration purposes only and therefore it's maintenance is not very heavy. But, back to the first point, before reviewing any changes, it'd be good to have them in separate PRs, since that will significantly help - at least myself - to review. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #8612: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #8612:
URL: https://github.com/apache/kafka/pull/8612#issuecomment-632820983


   Thanks @wj1918 !
   Happy to review after they are ready. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org