You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/03 18:57:02 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-717] Filter Out Empty MultiWorkUnits

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c320258  [GOBBLIN-717] Filter Out Empty MultiWorkUnits
c320258 is described below

commit c32025884987aee7044af0ee0c9f49a4ce31a6fe
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Wed Apr 3 11:56:51 2019 -0700

    [GOBBLIN-717] Filter Out Empty MultiWorkUnits
    
    Closes #2584 from ZihanLi58/zihli-branch
---
 .../kafka/workunit/packer/KafkaWorkUnitPacker.java    | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
index 676387d..a8dfae4 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
 
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -298,11 +299,19 @@ public abstract class KafkaWorkUnitPacker {
       addWorkUnitToMultiWorkUnit(group, lightestMultiWorkUnit);
       pQueue.add(lightestMultiWorkUnit);
     }
+    LinkedList<MultiWorkUnit> pQueue_filtered = new LinkedList();
+    while(!pQueue.isEmpty())
+    {
+      MultiWorkUnit multiWorkUnit = pQueue.poll();
+      if(multiWorkUnit.getWorkUnits().size() != 0)
+      {
+        pQueue_filtered.offer(multiWorkUnit);
+      }
+    }
 
-    logMultiWorkUnitInfo(pQueue);
-
-    double minLoad = getWorkUnitEstLoad(pQueue.peekFirst());
-    double maxLoad = getWorkUnitEstLoad(pQueue.peekLast());
+    logMultiWorkUnitInfo(pQueue_filtered);
+    double minLoad = getWorkUnitEstLoad(pQueue_filtered.peekFirst());
+    double maxLoad = getWorkUnitEstLoad(pQueue_filtered.peekLast());
     LOG.info(String.format("Min load of multiWorkUnit = %f; Max load of multiWorkUnit = %f; Diff = %f%%", minLoad,
         maxLoad, (maxLoad - minLoad) / maxLoad * 100.0));
 
@@ -310,7 +319,7 @@ public abstract class KafkaWorkUnitPacker {
     this.state.setProp(MAX_MULTIWORKUNIT_LOAD, maxLoad);
 
     List<WorkUnit> multiWorkUnits = Lists.newArrayList();
-    multiWorkUnits.addAll(pQueue);
+    multiWorkUnits.addAll(pQueue_filtered);
     return multiWorkUnits;
   }