You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/03 18:57:02 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-717] Filter Out
Empty MultiWorkUnits
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c320258 [GOBBLIN-717] Filter Out Empty MultiWorkUnits
c320258 is described below
commit c32025884987aee7044af0ee0c9f49a4ce31a6fe
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Wed Apr 3 11:56:51 2019 -0700
[GOBBLIN-717] Filter Out Empty MultiWorkUnits
Closes #2584 from ZihanLi58/zihli-branch
---
.../kafka/workunit/packer/KafkaWorkUnitPacker.java | 19 ++++++++++++++-----
1 file changed, 14 insertions(+), 5 deletions(-)
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
index 676387d..a8dfae4 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
import java.util.Collections;
import java.util.Comparator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -298,11 +299,19 @@ public abstract class KafkaWorkUnitPacker {
addWorkUnitToMultiWorkUnit(group, lightestMultiWorkUnit);
pQueue.add(lightestMultiWorkUnit);
}
+ LinkedList<MultiWorkUnit> pQueue_filtered = new LinkedList();
+ while(!pQueue.isEmpty())
+ {
+ MultiWorkUnit multiWorkUnit = pQueue.poll();
+ if(multiWorkUnit.getWorkUnits().size() != 0)
+ {
+ pQueue_filtered.offer(multiWorkUnit);
+ }
+ }
- logMultiWorkUnitInfo(pQueue);
-
- double minLoad = getWorkUnitEstLoad(pQueue.peekFirst());
- double maxLoad = getWorkUnitEstLoad(pQueue.peekLast());
+ logMultiWorkUnitInfo(pQueue_filtered);
+ double minLoad = getWorkUnitEstLoad(pQueue_filtered.peekFirst());
+ double maxLoad = getWorkUnitEstLoad(pQueue_filtered.peekLast());
LOG.info(String.format("Min load of multiWorkUnit = %f; Max load of multiWorkUnit = %f; Diff = %f%%", minLoad,
maxLoad, (maxLoad - minLoad) / maxLoad * 100.0));
@@ -310,7 +319,7 @@ public abstract class KafkaWorkUnitPacker {
this.state.setProp(MAX_MULTIWORKUNIT_LOAD, maxLoad);
List<WorkUnit> multiWorkUnits = Lists.newArrayList();
- multiWorkUnits.addAll(pQueue);
+ multiWorkUnits.addAll(pQueue_filtered);
return multiWorkUnits;
}