You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by Elbehery <gi...@git.apache.org> on 2015/02/27 17:41:33 UTC

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

GitHub user Elbehery opened a pull request:

    https://github.com/apache/flink/pull/442

    [FLINK-1615] [java api] SimpleTweetInputFormat

    This is a contribution with a TweetInputFormat, Jira [FLINK-1615](https://issues.apache.org/jira/browse/FLINK-1615).
    
    This commit contains, the InputFormat, the Pojos for tweet and nested object, the Hanlder, and a UnitTest with a Test File of four tweets.
    
    Before Pushing, the branch was rebased against upstream/master branch.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Elbehery/flink Tweet-into-Pojo

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/442.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #442
    
----
commit f25e2e83d60992ac11e38c5a842e185e2ad1c134
Author: elbehery <en...@live.com>
Date:   2015-02-27T16:34:27Z

    [FLINK-1615] [java api] SimpleTweetInputFormat

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-84912485
  
    I triggered another travis build: https://travis-ci.org/rmetzger/flink/builds/55459787


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by Elbehery <gi...@git.apache.org>.
Github user Elbehery commented on a diff in the pull request:

    https://github.com/apache/flink/pull/442#discussion_r26296679
  
    --- Diff: flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java ---
    @@ -0,0 +1,76 @@
    +package org.apache.flink.contrib;
    --- End diff --
    
    DONE


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by Elbehery <gi...@git.apache.org>.
Github user Elbehery commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-92100736
  
    @rmetzger  Still not merged, any updates ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/442#discussion_r25566680
  
    --- Diff: flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java ---
    @@ -0,0 +1,62 @@
    +package org.apache.flink.contrib;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.entities.HashTags;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +
    +public class SimpleTweetInputFormatTest {
    +
    +    private Tweet tweet;
    +
    +    private SimpleTweetInputFormat simpleTweetInputFormat;
    +
    +    private FileInputSplit fileInputSplit;
    +
    +    protected Configuration config;
    +
    +    protected File tempFile;
    +
    +
    +    @Before
    +    public void testSetUp() {
    +
    +
    +        simpleTweetInputFormat = new SimpleTweetInputFormat();
    +
    +        File jsonFile = new  File("../flink-contrib/src/main/resources/HashTagTweetSample.json");
    +
    +        fileInputSplit = new FileInputSplit(0, new Path(jsonFile.getPath()), 0, jsonFile.length(), new String[] {"localhost"});
    +    }
    +
    +    @Test
    +    public void testTweetInput() throws Exception {
    +
    +
    +        simpleTweetInputFormat.open(fileInputSplit);
    +        while (!simpleTweetInputFormat.reachedEnd()) {
    --- End diff --
    
    the test doesn't validate that the results are as expected.
    Our tests usually have some Junit assertions to make sure everything is as expected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-95570894
  
    Where is your git repository? So that I can checkout your commit and merge it?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-85686759
  
    The failure is not your fault. It failed because your code has been rebased to a master version with failing tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by Elbehery <gi...@git.apache.org>.
Github user Elbehery commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-95603365
  
    @aljoscha  you should be able to see it from the PR .. anyway this is mine https://github.com/Elbehery/flink


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-76759729
  
    @Elbehery Nice addition, I would like to add this.
    
    The build checks also mentions that the license headers are missing. Can you add them to the files?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by Elbehery <gi...@git.apache.org>.
Github user Elbehery commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-84968643
  
    @rmetzger  I think it failed again, but I cant see the reason 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/442#discussion_r26289003
  
    --- Diff: flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java ---
    @@ -0,0 +1,76 @@
    +package org.apache.flink.contrib;
    --- End diff --
    
    Missing license header.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by Elbehery <gi...@git.apache.org>.
Github user Elbehery commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-95512206
  
    I did rebase against the master before creating the PR .. but this was long time ago, could this be the problems for the conflicts ?!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by Elbehery <gi...@git.apache.org>.
Github user Elbehery commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-78480294
  
    @rmetzger  This is the standard Tweet format as per Twitter. Here You can find [Twitter Official Documentation](https://dev.twitter.com/overview/api/tweets). My parser is retrieving all the tweet except Bounding Box object, and retweeted object. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by Elbehery <gi...@git.apache.org>.
Github user Elbehery commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-83519001
  
    @StephanEwen, I have removed the license from the resource file, excluded it from RAT, and added the license file. Also I did rebase against upstream/master, before committing. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-95511047
  
    This looks good to merge. Any objections?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by Elbehery <gi...@git.apache.org>.
Github user Elbehery commented on a diff in the pull request:

    https://github.com/apache/flink/pull/442#discussion_r26284910
  
    --- Diff: flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java ---
    @@ -0,0 +1,67 @@
    +package org.apache.flink.contrib.tweetinputformat.io;
    +
    +import org.apache.flink.api.common.io.DelimitedInputFormat;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.GenericTypeInfo;
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
    +import org.codehaus.jackson.JsonParseException;
    +import org.json.simple.parser.JSONParser;
    +import org.json.simple.parser.ParseException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +
    +
    +public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet> implements ResultTypeQueryable<Tweet> {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(SimpleTweetInputFormat.class);
    +
    +    @Override
    +    public Tweet nextRecord(Tweet record) throws IOException {
    +        Boolean result = false;
    +
    +        do {
    +            try {
    +                record.reset(0);
    +                record = super.nextRecord(record);
    +                result = true;
    +
    +            } catch (JsonParseException e) {
    +                result = false;
    +
    +            }
    +        } while (!result);
    +
    +        return record;
    +    }
    +
    +    @Override
    +    public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException {
    +
    +
    +        InputStreamReader jsonReader = new InputStreamReader(new ByteArrayInputStream(bytes));
    +        jsonReader.skip(offset);
    +
    +        JSONParser parser = new JSONParser();
    --- End diff --
    
    I know, I have tried in the beginning to create them as instance fields in the class, however I received an error because the fields are not serializable. I tried to declare them as transient, but Flink threw "Can not submit Job" exception, If you have suggestions please let me know


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by Elbehery <gi...@git.apache.org>.
Github user Elbehery commented on a diff in the pull request:

    https://github.com/apache/flink/pull/442#discussion_r26288077
  
    --- Diff: flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java ---
    @@ -0,0 +1,62 @@
    +package org.apache.flink.contrib;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.entities.HashTags;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +
    +public class SimpleTweetInputFormatTest {
    +
    +    private Tweet tweet;
    +
    +    private SimpleTweetInputFormat simpleTweetInputFormat;
    +
    +    private FileInputSplit fileInputSplit;
    +
    +    protected Configuration config;
    +
    +    protected File tempFile;
    +
    +
    +    @Before
    +    public void testSetUp() {
    +
    +
    +        simpleTweetInputFormat = new SimpleTweetInputFormat();
    +
    +        File jsonFile = new  File("../flink-contrib/src/main/resources/HashTagTweetSample.json");
    +
    +        fileInputSplit = new FileInputSplit(0, new Path(jsonFile.getPath()), 0, jsonFile.length(), new String[] {"localhost"});
    +    }
    +
    +    @Test
    +    public void testTweetInput() throws Exception {
    +
    +
    +        simpleTweetInputFormat.open(fileInputSplit);
    +        while (!simpleTweetInputFormat.reachedEnd()) {
    --- End diff --
    
    Changed, I have tried to use "Switch" instead of multiple "if-else" because it more efficient, but the compiler of flink uses Java 1.6, and it can not accept "String" in switches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/442#discussion_r26288947
  
    --- Diff: flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java ---
    @@ -0,0 +1,76 @@
    +package org.apache.flink.contrib;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.entities.HashTags;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +import java.io.File;
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +
    +public class SimpleTweetInputFormatTest {
    +
    +    private Tweet tweet;
    +
    +    private SimpleTweetInputFormat simpleTweetInputFormat;
    +
    +    private FileInputSplit fileInputSplit;
    +
    +    protected Configuration config;
    +
    +    protected File tempFile;
    +
    +
    +    @Before
    +    public void testSetUp() {
    +
    +
    +        simpleTweetInputFormat = new SimpleTweetInputFormat();
    +
    +        File jsonFile = new  File("../flink-contrib/src/main/resources/HashTagTweetSample.json");
    +
    +        fileInputSplit = new FileInputSplit(0, new Path(jsonFile.getPath()), 0, jsonFile.length(), new String[] {"localhost"});
    +    }
    +
    +    @Test
    +    public void testTweetInput() throws Exception {
    +
    +
    +        simpleTweetInputFormat.open(fileInputSplit);
    +        List<String> result;
    +
    +        while (!simpleTweetInputFormat.reachedEnd()) {
    +            tweet = new Tweet();
    +            tweet = simpleTweetInputFormat.nextRecord(tweet);
    +
    +            if(tweet != null){
    --- End diff --
    
    Can you make the test fail if `tweet` is null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by Elbehery <gi...@git.apache.org>.
Github user Elbehery commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-82254957
  
    I have revised the commit .. All the files has the license header, except a resource file which contains 4 tweets for testing purpose .. Could this be the problem ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-76594115
  
    Thank you for the contribution.
    
    Is the json format parsed by this input format a standard format produced by twitter?
    How can I get such input data?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/442#discussion_r25566601
  
    --- Diff: flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java ---
    @@ -0,0 +1,67 @@
    +package org.apache.flink.contrib.tweetinputformat.io;
    +
    +import org.apache.flink.api.common.io.DelimitedInputFormat;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.GenericTypeInfo;
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
    +import org.codehaus.jackson.JsonParseException;
    +import org.json.simple.parser.JSONParser;
    +import org.json.simple.parser.ParseException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +
    +
    +public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet> implements ResultTypeQueryable<Tweet> {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(SimpleTweetInputFormat.class);
    +
    +    @Override
    +    public Tweet nextRecord(Tweet record) throws IOException {
    +        Boolean result = false;
    +
    +        do {
    +            try {
    +                record.reset(0);
    +                record = super.nextRecord(record);
    +                result = true;
    +
    +            } catch (JsonParseException e) {
    +                result = false;
    +
    +            }
    +        } while (!result);
    +
    +        return record;
    +    }
    +
    +    @Override
    +    public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException {
    +
    +
    +        InputStreamReader jsonReader = new InputStreamReader(new ByteArrayInputStream(bytes));
    +        jsonReader.skip(offset);
    +
    +        JSONParser parser = new JSONParser();
    +        TweetHandler handler = new TweetHandler();
    +
    +        try {
    +
    +            handler.reuse = reuse;
    +            parser.parse(jsonReader, handler, false);
    +        } catch (ParseException e) {
    +
    +            LOG.debug("Class "+SimpleTweetInputFormat.class+" "+e.getMessage() );
    --- End diff --
    
    Why are you logging the classname there?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by Elbehery <gi...@git.apache.org>.
Github user Elbehery commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-83499608
  
    @StephanEwen  I have checked and I found that the RAT plugin is included in parent pom.xml file. Shall I exclude the resources folder in flink-contribute, or add the plugin to flink-contribute and exclude the file from there ?!!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/442#discussion_r25566598
  
    --- Diff: flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java ---
    @@ -0,0 +1,67 @@
    +package org.apache.flink.contrib.tweetinputformat.io;
    +
    +import org.apache.flink.api.common.io.DelimitedInputFormat;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.GenericTypeInfo;
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
    +import org.codehaus.jackson.JsonParseException;
    +import org.json.simple.parser.JSONParser;
    +import org.json.simple.parser.ParseException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +
    +
    +public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet> implements ResultTypeQueryable<Tweet> {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(SimpleTweetInputFormat.class);
    +
    +    @Override
    +    public Tweet nextRecord(Tweet record) throws IOException {
    +        Boolean result = false;
    +
    +        do {
    +            try {
    +                record.reset(0);
    +                record = super.nextRecord(record);
    +                result = true;
    +
    +            } catch (JsonParseException e) {
    +                result = false;
    +
    +            }
    +        } while (!result);
    +
    +        return record;
    +    }
    +
    +    @Override
    +    public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException {
    +
    +
    +        InputStreamReader jsonReader = new InputStreamReader(new ByteArrayInputStream(bytes));
    +        jsonReader.skip(offset);
    +
    +        JSONParser parser = new JSONParser();
    --- End diff --
    
    I don't think you need to create a new parser and handler for each record.
    You can probably improve the performance by reusing the parser.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by Elbehery <gi...@git.apache.org>.
Github user Elbehery commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-81834859
  
    Why Failed ??


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/442#discussion_r26285316
  
    --- Diff: flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java ---
    @@ -0,0 +1,67 @@
    +package org.apache.flink.contrib.tweetinputformat.io;
    +
    +import org.apache.flink.api.common.io.DelimitedInputFormat;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.GenericTypeInfo;
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
    +import org.codehaus.jackson.JsonParseException;
    +import org.json.simple.parser.JSONParser;
    +import org.json.simple.parser.ParseException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +
    +
    +public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet> implements ResultTypeQueryable<Tweet> {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(SimpleTweetInputFormat.class);
    +
    +    @Override
    +    public Tweet nextRecord(Tweet record) throws IOException {
    +        Boolean result = false;
    +
    +        do {
    +            try {
    +                record.reset(0);
    +                record = super.nextRecord(record);
    +                result = true;
    +
    +            } catch (JsonParseException e) {
    +                result = false;
    +
    +            }
    +        } while (!result);
    +
    +        return record;
    +    }
    +
    +    @Override
    +    public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException {
    +
    +
    +        InputStreamReader jsonReader = new InputStreamReader(new ByteArrayInputStream(bytes));
    +        jsonReader.skip(offset);
    +
    +        JSONParser parser = new JSONParser();
    --- End diff --
    
    Putting the parser into a `transient` field and initalizing it in the `open()` method is the way to go.
    
    Can you give me the full stacktrace for the "Can not submit Job" exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-95846017
  
    The problem is, that I can't see it in the github interface. On what branch are your changes? Could you please rebase them on top of the current master?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by Elbehery <gi...@git.apache.org>.
Github user Elbehery commented on a diff in the pull request:

    https://github.com/apache/flink/pull/442#discussion_r26285363
  
    --- Diff: flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java ---
    @@ -0,0 +1,67 @@
    +package org.apache.flink.contrib.tweetinputformat.io;
    +
    +import org.apache.flink.api.common.io.DelimitedInputFormat;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.GenericTypeInfo;
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
    +import org.codehaus.jackson.JsonParseException;
    +import org.json.simple.parser.JSONParser;
    +import org.json.simple.parser.ParseException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +
    +
    +public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet> implements ResultTypeQueryable<Tweet> {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(SimpleTweetInputFormat.class);
    +
    +    @Override
    +    public Tweet nextRecord(Tweet record) throws IOException {
    +        Boolean result = false;
    +
    +        do {
    +            try {
    +                record.reset(0);
    +                record = super.nextRecord(record);
    +                result = true;
    +
    +            } catch (JsonParseException e) {
    +                result = false;
    +
    +            }
    +        } while (!result);
    +
    +        return record;
    +    }
    +
    +    @Override
    +    public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException {
    +
    +
    +        InputStreamReader jsonReader = new InputStreamReader(new ByteArrayInputStream(bytes));
    +        jsonReader.skip(offset);
    +
    +        JSONParser parser = new JSONParser();
    +        TweetHandler handler = new TweetHandler();
    +
    +        try {
    +
    +            handler.reuse = reuse;
    +            parser.parse(jsonReader, handler, false);
    +        } catch (ParseException e) {
    +
    +            LOG.debug("Class "+SimpleTweetInputFormat.class+" "+e.getMessage() );
    --- End diff --
    
    Edited


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by Elbehery <gi...@git.apache.org>.
Github user Elbehery commented on a diff in the pull request:

    https://github.com/apache/flink/pull/442#discussion_r26295918
  
    --- Diff: flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java ---
    @@ -0,0 +1,76 @@
    +package org.apache.flink.contrib;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.entities.HashTags;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +import java.io.File;
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +
    +public class SimpleTweetInputFormatTest {
    +
    +    private Tweet tweet;
    +
    +    private SimpleTweetInputFormat simpleTweetInputFormat;
    +
    +    private FileInputSplit fileInputSplit;
    +
    +    protected Configuration config;
    +
    +    protected File tempFile;
    +
    +
    +    @Before
    +    public void testSetUp() {
    +
    +
    +        simpleTweetInputFormat = new SimpleTweetInputFormat();
    +
    +        File jsonFile = new  File("../flink-contrib/src/main/resources/HashTagTweetSample.json");
    +
    +        fileInputSplit = new FileInputSplit(0, new Path(jsonFile.getPath()), 0, jsonFile.length(), new String[] {"localhost"});
    +    }
    +
    +    @Test
    +    public void testTweetInput() throws Exception {
    +
    +
    +        simpleTweetInputFormat.open(fileInputSplit);
    +        List<String> result;
    +
    +        while (!simpleTweetInputFormat.reachedEnd()) {
    +            tweet = new Tweet();
    +            tweet = simpleTweetInputFormat.nextRecord(tweet);
    +
    +            if(tweet != null){
    --- End diff --
    
    I tried to do so, but the problem is that the reachedEnd() is updated inside nextRecord() in DelimitedInputFormat. I tried to make a condition when the readPos == limit, do not call nextRecord, but I could not because these fields are private. 
    
    So, I have to stop reading when the reachedEnd is true, which in turn means that the returned object from nextRecord [ the tweet ] is null. To add the required assert, all the tests will fail. I do not know how to avoid this problem, do u have a suggestion ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/442


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by Elbehery <gi...@git.apache.org>.
Github user Elbehery commented on a diff in the pull request:

    https://github.com/apache/flink/pull/442#discussion_r26296323
  
    --- Diff: flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java ---
    @@ -0,0 +1,67 @@
    +package org.apache.flink.contrib.tweetinputformat.io;
    +
    +import org.apache.flink.api.common.io.DelimitedInputFormat;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.GenericTypeInfo;
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
    +import org.codehaus.jackson.JsonParseException;
    +import org.json.simple.parser.JSONParser;
    +import org.json.simple.parser.ParseException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +
    +
    +public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet> implements ResultTypeQueryable<Tweet> {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(SimpleTweetInputFormat.class);
    +
    +    @Override
    +    public Tweet nextRecord(Tweet record) throws IOException {
    +        Boolean result = false;
    +
    +        do {
    +            try {
    +                record.reset(0);
    +                record = super.nextRecord(record);
    +                result = true;
    +
    +            } catch (JsonParseException e) {
    +                result = false;
    +
    +            }
    +        } while (!result);
    +
    +        return record;
    +    }
    +
    +    @Override
    +    public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException {
    +
    +
    +        InputStreamReader jsonReader = new InputStreamReader(new ByteArrayInputStream(bytes));
    +        jsonReader.skip(offset);
    +
    +        JSONParser parser = new JSONParser();
    --- End diff --
    
    It worked now, Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by Elbehery <gi...@git.apache.org>.
Github user Elbehery commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-82261434
  
    DONE


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by Elbehery <gi...@git.apache.org>.
Github user Elbehery commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-78480464
  
    @StephanEwen  I have added the license. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-82260288
  
    Yes, this can be the problem. Can you add a licence header (with comments) to this file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/442#discussion_r26300021
  
    --- Diff: flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java ---
    @@ -0,0 +1,76 @@
    +package org.apache.flink.contrib;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.entities.HashTags;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +import java.io.File;
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +
    +public class SimpleTweetInputFormatTest {
    +
    +    private Tweet tweet;
    +
    +    private SimpleTweetInputFormat simpleTweetInputFormat;
    +
    +    private FileInputSplit fileInputSplit;
    +
    +    protected Configuration config;
    +
    +    protected File tempFile;
    +
    +
    +    @Before
    +    public void testSetUp() {
    +
    +
    +        simpleTweetInputFormat = new SimpleTweetInputFormat();
    +
    +        File jsonFile = new  File("../flink-contrib/src/main/resources/HashTagTweetSample.json");
    +
    +        fileInputSplit = new FileInputSplit(0, new Path(jsonFile.getPath()), 0, jsonFile.length(), new String[] {"localhost"});
    +    }
    +
    +    @Test
    +    public void testTweetInput() throws Exception {
    +
    +
    +        simpleTweetInputFormat.open(fileInputSplit);
    +        List<String> result;
    +
    +        while (!simpleTweetInputFormat.reachedEnd()) {
    +            tweet = new Tweet();
    +            tweet = simpleTweetInputFormat.nextRecord(tweet);
    +
    +            if(tweet != null){
    --- End diff --
    
    Cool. 
    Then I'd recommend to add a similar test to your test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-81837584
  
    Have a look at the build server logs. They still complain about unapproved license headers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by Elbehery <gi...@git.apache.org>.
Github user Elbehery commented on a diff in the pull request:

    https://github.com/apache/flink/pull/442#discussion_r26300742
  
    --- Diff: flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java ---
    @@ -0,0 +1,76 @@
    +package org.apache.flink.contrib;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.entities.HashTags;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +import java.io.File;
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +
    +public class SimpleTweetInputFormatTest {
    +
    +    private Tweet tweet;
    +
    +    private SimpleTweetInputFormat simpleTweetInputFormat;
    +
    +    private FileInputSplit fileInputSplit;
    +
    +    protected Configuration config;
    +
    +    protected File tempFile;
    +
    +
    +    @Before
    +    public void testSetUp() {
    +
    +
    +        simpleTweetInputFormat = new SimpleTweetInputFormat();
    +
    +        File jsonFile = new  File("../flink-contrib/src/main/resources/HashTagTweetSample.json");
    +
    +        fileInputSplit = new FileInputSplit(0, new Path(jsonFile.getPath()), 0, jsonFile.length(), new String[] {"localhost"});
    +    }
    +
    +    @Test
    +    public void testTweetInput() throws Exception {
    +
    +
    +        simpleTweetInputFormat.open(fileInputSplit);
    +        List<String> result;
    +
    +        while (!simpleTweetInputFormat.reachedEnd()) {
    +            tweet = new Tweet();
    +            tweet = simpleTweetInputFormat.nextRecord(tweet);
    +
    +            if(tweet != null){
    --- End diff --
    
    DONE


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by Elbehery <gi...@git.apache.org>.
Github user Elbehery commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-95866843
  
    @aljoscha  u were right.. I could not find the code on my repo, somehow it was lost !!! .. 
    
    I have created a new PR now, #621  .. it has the same commit of #442  .. 
    
    Please let me know if there is any issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/442#issuecomment-83455819
  
    Your own tests fail for this code. Probably because they get thrown off by the comment in the sample JSON file.
    
    A way to make this work is to exclude this file from the RAT test and add a notice text file next to it that states the license header.
    Or to make the input format aware of line comments (starting with `//` or `#`) and define the header like this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

Posted by Elbehery <gi...@git.apache.org>.
Github user Elbehery commented on a diff in the pull request:

    https://github.com/apache/flink/pull/442#discussion_r26299783
  
    --- Diff: flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java ---
    @@ -0,0 +1,76 @@
    +package org.apache.flink.contrib;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.entities.HashTags;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +import java.io.File;
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +
    +public class SimpleTweetInputFormatTest {
    +
    +    private Tweet tweet;
    +
    +    private SimpleTweetInputFormat simpleTweetInputFormat;
    +
    +    private FileInputSplit fileInputSplit;
    +
    +    protected Configuration config;
    +
    +    protected File tempFile;
    +
    +
    +    @Before
    +    public void testSetUp() {
    +
    +
    +        simpleTweetInputFormat = new SimpleTweetInputFormat();
    +
    +        File jsonFile = new  File("../flink-contrib/src/main/resources/HashTagTweetSample.json");
    +
    +        fileInputSplit = new FileInputSplit(0, new Path(jsonFile.getPath()), 0, jsonFile.length(), new String[] {"localhost"});
    +    }
    +
    +    @Test
    +    public void testTweetInput() throws Exception {
    +
    +
    +        simpleTweetInputFormat.open(fileInputSplit);
    +        List<String> result;
    +
    +        while (!simpleTweetInputFormat.reachedEnd()) {
    +            tweet = new Tweet();
    +            tweet = simpleTweetInputFormat.nextRecord(tweet);
    +
    +            if(tweet != null){
    --- End diff --
    
    I think this behavior was intended, by checking the DelimitedInputFormat UnitTest I found this, I will edit my test case, to fix the problem.
    ![selection_019](https://cloud.githubusercontent.com/assets/2375289/6618391/d3366786-c8c2-11e4-8854-2a38a8da4da3.png)
     


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---