You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@metron.apache.org by GitBox <gi...@apache.org> on 2019/02/11 22:48:04 UTC

[GitHub] nickwallen commented on a change in pull request #1330: METRON-1968: Messages are lost when a parser produces multiple messages and batch size is greater than 1

nickwallen commented on a change in pull request #1330: METRON-1968: Messages are lost when a parser produces multiple messages and batch size is greater than 1
URL: https://github.com/apache/metron/pull/1330#discussion_r255726568
 
 

 ##########
 File path: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
 ##########
 @@ -227,110 +118,93 @@ public void write( String sensorType
       batchTimeoutMap.put(sensorType, batchTimeoutInfo);
     }
 
-    Collection<Tuple> tupleList = sensorTupleMap.get(sensorType);
-    if (tupleList == null) {
+    if (messageList.isEmpty()) {
       //This block executes at the beginning of every batch, per sensor.
-      tupleList = createTupleCollection();
-      sensorTupleMap.put(sensorType, tupleList);
       batchTimeoutInfo[LAST_CREATE_TIME_MS] = clock.currentTimeMillis();
       //configurations can change, so (re)init getBatchTimeout(sensorType) at start of every batch
       int batchTimeoutSecs = configurations.getBatchTimeout(sensorType);
       if (batchTimeoutSecs <= 0 || batchTimeoutSecs > defaultBatchTimeout) {
         batchTimeoutSecs = defaultBatchTimeout;
       }
       batchTimeoutInfo[TIMEOUT_MS] = TimeUnit.SECONDS.toMillis(batchTimeoutSecs);
+      LOG.debug("Setting batch timeout to {} for sensor {}.", batchTimeoutInfo[TIMEOUT_MS], sensorType);
     }
-    tupleList.add(tuple);
 
-    List<MESSAGE_T> messageList = sensorMessageMap.get(sensorType);
-    if (messageList == null) {
-      messageList = new ArrayList<>();
-      sensorMessageMap.put(sensorType, messageList);
-    }
-    messageList.add(message);
+    messageList.put(messageId, message);
 
     //Check for batchSize flush
-    if (tupleList.size() >= batchSize) {
-      flush(sensorType, bulkMessageWriter, configurations, messageGetStrategy, tupleList, messageList);
+    if (messageList.size() >= batchSize) {
+      LOG.debug("Batch size of {} reached. Flushing {} messages for sensor {}.", batchSize, messageList.size(), sensorType);
 
 Review comment:
   It might make a lot of sense to extract the "Should I flush?" logic into separate policy classes.  
   
   For example, we have some kind of `Policy` interface that the `BulkWriterComponent` interacts with.   The `BulkWriterComponent` can be initialized with zero or more Policy implementations.  It simply asks each `Policy`, "Hey, do I need to flush?" 
   
   We would then have two policies; one that flushes when a max size is exceeded and another that flushes when a max timeout is exceeded.  Keeping these separate is going to make the code much simpler and easier to test.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services