You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/02 23:52:24 UTC
[incubator-pinot] 07/08: Checnges in test to make it complie
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit eb0e4c782829dd3d33ae50ee91def7e09028b090
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Thu Dec 31 15:49:33 2020 -0800
Checnges in test to make it complie
---
.../controller/helix/core/PinotHelixResourceManager.java | 4 ++--
.../core/realtime/PinotLLCRealtimeSegmentManager.java | 2 +-
.../core/realtime/PinotLLCRealtimeSegmentManagerTest.java | 15 +++++++++++----
3 files changed, 14 insertions(+), 7 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index e80c06b..5388eeb 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1337,7 +1337,7 @@ public class PinotHelixResourceManager {
idealState = PinotTableIdealStateBuilder
.buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState,
_enableBatchMessageMode);
- _pinotLLCRealtimeSegmentManager.setupNewTable(realtimeTableConfig, idealState);
+ _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
LOGGER.info("Successfully setup table for SHARDED consumers for {} ", realtimeTableName);
} else {
@@ -1366,7 +1366,7 @@ public class PinotHelixResourceManager {
idealState = PinotTableIdealStateBuilder
.buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState,
_enableBatchMessageMode);
- _pinotLLCRealtimeSegmentManager.setupNewTable(realtimeTableConfig, idealState);
+ _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
} else {
LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index d899b4c..cf3a401 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -207,7 +207,7 @@ public class PinotLLCRealtimeSegmentManager {
/**
* Sets up the realtime table ideal state for a table of consumer type SHARDED
*/
- public void setupNewTable(TableConfig tableConfig, IdealState idealState) {
+ public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
Preconditions.checkState(!_isStopping, "Segment manager is stopping");
String realtimeTableName = tableConfig.getTableName();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 42bdedc..75c8057 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -907,15 +907,22 @@ public class PinotLLCRealtimeSegmentManagerTest {
@Override
void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName,
- String newSegmentName, SegmentAssignment segmentAssignment,
+ List<String> newSegmentNames, SegmentAssignment segmentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), committingSegmentName,
- newSegmentName, segmentAssignment, instancePartitionsMap);
+ null, segmentAssignment, instancePartitionsMap);
+ for (String segmentName : newSegmentNames) {
+ updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), null,
+ segmentName, segmentAssignment, instancePartitionsMap);
+ }
}
@Override
- List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
- return IntStream.range(0, _numPartitions).mapToObj(FakePartitionGroupMetadata::new).collect(Collectors.toList());
+ List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig,
+ List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
+ return IntStream.range(0, _numPartitions).mapToObj(i -> new PartitionGroupInfo(i,
+ getPartitionOffset(streamConfig, OffsetCriteria.SMALLEST_OFFSET_CRITERIA, i).toString()))
+ .collect(Collectors.toList());
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org