You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vijay Balakrishnan <bv...@gmail.com> on 2020/09/14 21:18:36 UTC

Get only the 1st gz file from an s3 folder

Hi,
Able to read *.gz files from an s3 folder. I want to *get the 1st gz file*
from the s3 folder and then sort only the 1st gz file into an Ordered Map
as below and get the orderedMap.*getFirstKey()  as a 1st event timestamp*.
I want to then *pass this 1st event timestamp to all TaskManagers along
with a single current time* as an epoch Time.


final DataStreamSource<String> stringDataStreamSource =
env.readTextFile(s3Folder);
final SingleOutputStreamOperator<Map<String, Map<String, Object>>>
orderedMapOutput = stringDataStreamSource.map(new MapFunction<String,
Map<String, Map<String, Object>>>() {
    @Override
    public Map<String, Map<String, Object>> map(String jsonStr) throws
Exception {
        logger.info("record written:{}", jsonStr); //this shows the proper
json string from within the gz file properly
        Map<String, Object> resultMap = fromJson(jsonStr);//deserialize json
        //sort by event_timestamp
        Map<String, Map<String, Object>> orderedMap = new TreeMap<>();
        if (resultMap != null) {
            Object eventTsObj = resultMap.get(EVENT_TIMESTAMP);
            if (eventTsObj != null) {
                String eventTS = (String) eventTsObj;
                orderedMap.put(eventTS, resultMap);
            }
        } else {
            logger.warn("Could not deserialize:{}", jsonStr);
        }
        return orderedMap;
    }
});

TIA,