You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/01 07:17:13 UTC

(pinot) branch maintain-pool-selection-for-minimizeDataMovement updated (08efc8c2f1 -> a286bd868f)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a change to branch maintain-pool-selection-for-minimizeDataMovement
in repository https://gitbox.apache.org/repos/asf/pinot.git


    omit 08efc8c2f1 Add logic to consider the case when instances are moved across pools
    omit 83911abc93 Address PR comments
    omit d9ae2e9432 Update pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
    omit 95ed621750 Enhance the minimizeDataMovement to keep the existing pool assignment
     add 6638d4edcb [bugfix]Add logs to track sequence of events for table creation (#11946)
     add d177866527 [bugfix][multistage] explicit warning flags set on each stage stats (#11936)
     add 8f69299aef upgrade lucene to 9.8.0
     add 00cf7a8295 Try lucene-backward-codecs package
     add 06b58d4dcd adding lucene 80 compatible test
     add 45f186903e Support constant filter in QueryContext, and make server able to handle it (#11956)
     add 8961e5460f added check for illegal character '/' in taskName
     add 9092244e0b Added invalid argument related tests for AdhocTaskConfig and rephrased an error message.
     add b5e982333d [test][multistage] enhance query plan test (#11966)
     add e3f2777e36 Updated for commons-configuration2 in PinotConfiguartion (#11916)
     add 49faa9424a register the new segment when it's fully initalized by partitionUpsertMetadataManager (#11964)
     add fc2395a29c Added invalid argument related tests for AdhocTaskConfig and rephrased an error message.
     add 3e2aa44535 Allow user specify local temp directory for quickstart
     add 972b555cc5 fixing dateTimeConvert
     add aac489864b Ability to fetch Segment metadata in batches
     add 330938fe4c Make groupBy trim size configurable at Broker (#11958)
     add 40ede67d76 fixing the wrong import for Preconditions (#11979)
     add b92e4da2a0 Bump io.netty:netty-bom from 4.1.94.Final to 4.1.100.Final (#11972)
     add a40ed04bac Bump org.scalatest:scalatest-maven-plugin from 1.0 to 2.2.0 (#11973)
     add a986dd1536 fix for #11974, exclude `module-info.class` (#11975)
     add 73d82ec666 [multistage][bugfix] fix operator eos pull (#11970)
     add 10afb1a89d [enhance]: Add cache for Bcrypt password
     add bb52625f8a instrument building datatable (#11942)
     add 2beb9a4938 Bump commons-net:commons-net from 3.1 to 3.10.0 (#11982)
     add b38bd9fb34 [hotfix] fix filter merge after rule (#11989)
     add 610c36afb6 [test][multistage] adding filter pushdown tests (#11994)
     add f30fdee7ba [multistage][feature] leaf planning with multi-semi join support (#11937)
     add 0dd2522afd add an option to skip controller cert validation in AddTableCommand (#11967)
     add b2b802a427 fix for #11996 which corrects property name typo (#11997)
     add e26404ca54 Bump com.jayway.jsonpath:json-path from 2.7.0 to 2.8.0 (#11987)
     add b685a2e318 Bump simpleclient_common.version from 0.8.1 to 0.16.0 (#11986)
     add 53883aedb5 Adding byte functions for UUIDs (#11988)
     add 5f22d16086 Fix fromULL scalar function (#11995)
     add 7217bb9d16 [multistage][bugfix] leaf limit refactor issue (#12001)
     add 62abf1fb22 Fix the test failures caused by instance drop failure (#12002)
     add 6d1b0609c5 Fix the misuse of star-tree when all predicates are always false under OR (#12003)
     add 88fe079ecd Fix the race condition of concurrent modification to segment data managers (#12004)
     add 45f7d44dc9 Bump com.mercateo:test-clock from 1.0.2 to 1.0.4 (#12005)
     add 5f601bcc01 Bump circe.version from 0.14.2 to 0.14.6 (#12006)
     add 1ab9e62b6d Add query option override for Broker MinGroupTrimSize (#11984)
     add 71e9c2cd25 Replace timer with scheduled executor service in IngestionDelayTracker (#11849)
     add 11151e6df6 Bump net.nicoulaj.maven.plugins:checksum-maven-plugin from 1.8 to 1.11 (#12008)
     add 41d1a12160 Bump com.google.code.gson:gson from 2.2.4 to 2.10.1 (#12009)
     add 09da0eac74 [multistage][feature] support RelDistribution trait planning (#11976)
     add 1d8f2b6366 canonicalize SqlKind.OTHERS as well as SqlKind.OTHER_FUNCTIONS (#12025)
     add 8c604f8354 [Flaky-test] Fix PinotTenantRestletResourceTest (#12019)
     add 8d93a169a1 Bump grpc.version from 1.53.0 to 1.59.0 (#12023)
     add 968c0a3d2a Bump dropwizard-metrics.version from 4.2.2 to 4.2.22 (#12022)
     add 0cb43eb499 Bump org.jacoco:jacoco-maven-plugin from 0.8.9 to 0.8.11 (#12024)
     add c1fdb66ae8 Add upsert config - outOfOrderRecordColumn to track out-of-order events (#11877)
     add 244c947fdd Fix flaky PinotTenantRestletResourceTest (#12026)
     add 6aecd41641 Add a check to enable size based threshold for realtime tables (#12016)
     add f7f82608e0 Adds support for leveraging StarTree index in conjunction with filtered aggregations (#11886)
     add 32db500160 Fix derived column from MV column (#12028)
     add 8e84fc3fc5 Optimize segment commit to not read partition group metadata (#11943)
     add 54e828ad14 Bump com.google.code.findbugs:jsr305 from 3.0.0 to 3.0.2 (#12031)
     add f435f4c7e4 Bump org.codehaus.mojo:appassembler-maven-plugin from 1.10 to 2.1.0 (#12030)
     add 61556af200 fix by checking the index first (#12029)
     add 35748a013b Explicit null handling (#11960)
     add 2c88c4f74d fix flakyness by replacing HashSet and HashMap with LinkedHashSet and LinkedHashMap (#11941)
     add 31c5aaec0a Bump org.reactivestreams:reactive-streams from 1.0.3 to 1.0.4 (#12033)
     add e4d3a8dde8 Bump io.grpc:grpc-context from 1.14.0 to 1.59.0 (#12034)
     add 22403e5675 [multistage][bugfix] fix multi-semi-join issues (#12038)
     add b3af476b83 Prevent inverted index on a non dictionary column in table config (#12043)
     add 95d4950dab Added new SpecialValueTransformer and tests.
     add 5c8f43ead1 fix typo in comment.
     add 2c05f8dca9 addressed comments and added test for ensuring order of transformers.
     add 06e91c13a1 added code to remove NaN from multivalued columns and modified order of transformers.
     add 05be5cfd9a modifed test to ensure order of transformers.
     add f79b618141 fixed note in SpecialValueTransformer.
     add 97f8f5f6cb Disable rebase/merge merge buttons (#12051)
     add a37ced6ec9 Fix rebalance on upsert table (#12054)
     add bfd7dce57e Fix the memory leak issue on `CommonsConfigurationUtils` (#12056)
     add 6fe2d25c21 Fix incorrect handling of consumer creation errors (#12045)
     add d654cc9db2 Add a new MV forward index to only store unique MV values (#11993)
     add 3bd74b36e4 Allow optional segments that can be skipped by servers without failing the query (#11978)
     add 37270f749c [multistage][cleanup] remove some unused rules (#12052)
     add c57117a750 [multistage] don't prune project after agg during relBuilder (#12058)
     add b9ed378355 [multistage][refactor] clean up planner (#12070)
     add 88fcd0e4ee Fix bug with silently ignoring force commit call failures (#12044)
     add cc7e7a8b86 Introduce low disk mode to table rebalance (#12072)
     add d7c76b9fbb no need for segment locks during segment preloading (#12077)
     add fe072c63bd Remove what pinot doesn't do (#12075)
     add b131f89461 make deep store upload retry async with configurable parallelism (#12017)
     add 69fbeeffe6 Enabling SegmentGenerationAndPushTask to push segment to realtime table (#12084)
     add 9497f06cba Fix default brokerUpdateFrequencyInMillis for connector (#12093)
     add 6d19f4a3b4 Bump up the parquet version to 1.13.1 (#12076)
     add 12beff75df Bump xml-apis:xml-apis from 1.4.01 to 2.0.2 (#12082)
     add d7e2e7834c Fixed log4j location in values.yaml #12010 (#12083)
     add 9d779b4757 Correct path of default log4j configures (#12069)
     add 96c2f3dd9d fix by repalcing HashSet with TreeSet (#12047)
     add b25f7cf6c1 Adds support for CTRL key as a modifier for Query shortcuts (#12087)
     add 67cdb27721 Configurable Lucene analyzer (#12027)
     add 559623054b Bump com.azure:azure-identity from 1.8.1 to 1.11.1 (#12095)
     add 49d0ff01a4 Add support for murmur3 as a partition function (#12049)
     add a9e319964f Theta Sketch Aggregation Enhancements (#12042)
     add 9013e724a7 Fix compilation issue for lucene analyzer class change (#12101)
     add 49804a4f56 Add support for retention on deleted keys of upsert tables (#12037)
     add 3474db4abb [test] fix multi-stage explain error test (#12098)
     add 48dd67e761 Bump net.java.dev.javacc:javacc from 7.0.10 to 7.0.13 (#12103)
     add 3811240629 need to synchronize replacing upsert segment (#12105)
     add 47cfcb382c Support Vector index and HNSW as the first implementation (#11977)
     add e62db612c9 Metrics for Table Disabled and Consumption Paused (#12000)
     add 45d9a4b8cf UI: add lowDiskMode option in rebalance operation (#12112)
     add e207844038 [multistage]partition assignment refactor (#12079)
     add 20e7fdc1d8 Bump org.apache.maven.plugins:maven-assembly-plugin from 3.1.1 to 3.6.0 (#12109)
     add f5f878e1ba Fix helm chart server probe endpoint backward incompatible (#12114)
     add d7dc4c5eff Bump org.apache.avro:avro from 1.10.2 to 1.11.3 (#12116)
     add 56ecafc17e Add singleton registry for all metrics (#12119)
     add 27cc8a796f Reduce logging volume to log the conversions per record instead of per conversion for SpecialValueTransformer  (#12121)
     add ed8a251f07 Bump com.google.guava:guava from 32.0.1-jre to 32.1.3-jre (#12124)
     add a4e5fa3326 Bump com.github.luben:zstd-jni from 1.5.5-6 to 1.5.5-11 (#12125)
     add d68a5a8dac PR to address Issue #12127 - Refactor foreign logger initialization in multiple classes (#12128)
     add f4b5de6b8a [hotfix] remove non-exist vector transform (#12140)
     add 92e26a5847 Introduce UpsertContext to simplify the upsert metadata manager constructor (#12120)
     add d38e15d1fa fixes #12136 (#12137)
     add 05afb65ec7 fix multiple instances where the number of placeholders did not match the number of arguments to logging methods (#12134)
     add a47b9cff16 Bump log4j.version from 2.20.0 to 2.22.0 (#12143)
     add f3b94f60e4 Fix for #12133 - add context and proper logging framework to OsCheck output (#12135)
     add 280d368a71 avoid unnecessary transformer (#12138)
     add 5b623855bc Bump org.apache.commons:commons-collections4 from 4.1 to 4.4 (#12149)
     add f22de0a4be Improve resource management with try-with-resource blocks.  Fix for #12129 (#12130)
     add 46ae9e97ff refine warning msg when preloading segments not exist on server (#12153)
     add 7514fc5978 Bump org.apache.zookeeper:zookeeper from 3.6.3 to 3.7.2 (#12152)
     add b44b3aeb8f Changes for migration to commons-configuration2 (#11985)
     add 618e049dde Bump org.apache.datasketches:datasketches-java from 4.1.0 to 5.0.0 (#12161)
     add 979c9125ed Bump com.google.api.grpc:proto-google-common-protos (#12159)
     add 0a124ee3be Bump org.glassfish.tyrus.bundles:tyrus-standalone-client (#12162)
     add aa834f4ccc Remove parameters from ThetaSketchAggregation function (#12147)
     add fd45c1bbd2 [feature] add support for StreamNative OAuth2 authentication for pulsar. (#12068)
     add 333e17c765 Update LICENSE-binary for commons-configuration2 upgrade. (#12165)
     add ccc5f34507 Bump org.apache.httpcomponents:httpclient from 4.5.13 to 4.5.14 (#12172)
     add 3d3012fd20 Bump com.gradle:common-custom-user-data-maven-extension (#12171)
     add c085f0497d Bump org.apache.yetus:audience-annotations from 0.13.0 to 0.15.0 (#12170)
     add f0b78b44e3 Bug fix: reset primary key count to 0 when table is deleted (#12169)
     add f36cc10f4f Support initializing broker tags from config (#12175)
     add 488b336ab4 Bump dropwizard-metrics.version from 4.2.22 to 4.2.23 (#12178)
     add fade75bf4e Fix rebalancer converge check to ensure EV is converged before reporting success (#12182)
     add c706fd04e5 Fix partition handling by always using string values (#12115)
     add 85b5779cff Support array literal functions (#12118)
     add 85d3850227 fix regex gen with escape issue on single quote (#12181)
     add 24f8433d5f Bump com.google.auto.service:auto-service from 1.0.1 to 1.1.1 (#12183)
     add 9d939a007e Add partition level Force Commit (#12088)
     add 9b6d0b7cb8 Bump org.freemarker:freemarker from 2.3.30 to 2.3.32 (#12192)
     add 50153796c6 Bump com.azure:azure-core from 1.37.0 to 1.45.1 (#12193)
     add 9cfce82385 Proper computation of realtime "segment.flush.threshold.size" in case of force-commit (#12188)
     add 95ec64e50e Bump io.grpc:grpc-context from 1.59.0 to 1.60.1 (#12198)
     add 344eba125e Bump org.apache.spark:spark-launcher_2.12 from 3.2.1 to 3.5.0 (#12199)
     add a20031528d Data generator reorganisation (#12122)
     add 50912eb0c4 Create DateTimeGenerator and add it to data generator (#12206)
     add 5b4e19aa4e fix: Pulsar OAuth2 Authentication Factory (#12195)
     add d4d910cec1 FileWriter bug fixes (#12208)
     add a0a9b6bca6 Specify Multi-Release property in manifest (#12131)
     add f9df57a9e3 Add column name to JSONParsing exception message during index build (#12151)
     add add2236a42 [hotfix] fix table name not escaped during routing (#12212)
     add d1cc17c579 [multistage][hotfix] use UTF-8 as default CharSet, this is also true for v1 engine (#12213)
     add 8f5fa804cd Add a 'lastUsed' option for resumeConsumption.consumeFrom (#12200)
     add 298e8d9191 Fix connection pool error by creating new default credential (#12221)
     add 436968e080 make all /size users render async (#12210)
     add 07bcab80f1 [test] add back quickstart streaming (#12231)
     add 388d394c7f Fix upsert tenant tag override check (#12233)
     add 23c1e5f3a5 [bugfix] fix literal query return multiple duplicate results (#12240)
     add d1817efff7 [multistage][bugfix] improve sort copy rule (#12237)
     add 7cb5973185 fix a deadlock due to getting segmentlock before snapshot lock when replacing segment (#12241)
     add 9947bc5916 refine how to take validDocIds snapshot (#12232)
     add d05e3bd05c [bugfix] Handle NPE in controller SQL Resource (#12211)
     add 8c86ad4652 fix a bug that would take validdocids snapshots redundantly (#12246)
     add 747e34dede Allow String / numeric data type for deleteRecordColumn config (#12222)
     add 7132a2203f [multistage][bugfix] sort copy rule to always push limit when no collation (#12251)
     add 2e367a202e UI: fix table link issue in task detail page (#12253)
     add b4fbfe9687 Making utility accessible to generate metadata file (#12255)
     add a4c3286018 add null handling to sketch group-by (#12259)
     add ae55a7abf8 Catch-all Regex for JXM -> Prom Exporter (#12073)
     add 19e74e80a8 Added dynamic SSL initialization support for the Kafka client (#12249)
     add 21f3d283d4 Allow server level configuration for Upsert metadata class (#11851)
     add 110c5b4947 Bump com.yscope.clp:clp-ffi from 0.4.3 to 0.4.4 (#12203)
     add 12c90b2155 Bump flink.version from 1.12.0 to 1.14.6 (#12202)
     add 8713dc045f Bump commons-codec:commons-codec from 1.15 to 1.16.0 (#12204)
     add 4ad36c3482 Remove TableDataManagerConfig and simplify TableDataManager construction (#12189)
     add 6bb387a10a Support array gen in literal evaluation (#12278)
     add 17e1aa11a4 Fix chained literal functions evaluation (#12248)
     add 894e56e0a2 Expose metric for table rebalance (#12270)
     add 7b2a82dd27 Modify distribution pom to include services (#12289)
     add 640ebe52cf Sticky query routing via query options (#12276)
     add 0f6015a5d3 JMX Exporter Preserve Original Regexes (#12295)
     add 5cc7231dc0 Backwards compatible theta sketch aggregation (#12288)
     add 7b69d094be Refactoring the upsert compaction related code (#12275)
     add ced6bc282e Use higher fetch timeout for Kinesis (#12214)
     add f1fec060a6 Support runtime reload for TLS resources (#12277)
     add eeaf1f0811 [feature] allow dim table config to detect/disallow duplicate PK  (#12290)
     add 35faeb6712 Revert allowing tag override with upserts (#12311)
     add 63e91ef95f Support table suffix in ZkBasicAuthAccessControlFactory (#12310)
     add 91ffcc759f Reduce Disk Footprint for Segment Processor Framework to Avoid Out of Disk Issues (#12220)
     add f43664dd13 Support server level consumption throttle (#12292)
     add 9c1bb02dec Misc fixes for upsert metadata manager (#12319)
     add 76d0eb25db Reduce Heap Usage of OnHeapStringDictionary (#12223)
     add 7978d29031 Mark distribution as multi-release (#12300)
     add 5a382f2e7d Shared aggregations in StarTree (#12164)
     add 2acf8ea354 correct errmsg format for Preconditions.check... (#12327)
     add 23dbb08e8d Make server resource classes configurable (#12324)
     add ed6761a982 Fix "rewind()" for CompactedPinotSegmentRecordReader (#12329)
     add 4823802886 Wire soft upsert delete for compaction task (#12330)
     add 6cc1915140 Add HttpHeaders in broker event listener requestContext (#12258)
     add e1d20d75d5 Making taskManager resources protected for derived classes to override in their setUp() method. (#12335)
     add 1a82ba6fc6 Add CPU metrics for minion purge task (#12337)
     add d5014786db Add first implementation of clpMatch that doesn't explicitly use indexes. (#12291)
     add 3ab28348fb auto reload TLS resources when loading from local files (#12325)
     add 8988b755d6 Remove segments with empty download url in UpsertCompactionTask (#12320)
     add dd8be2a229 add TextMatchFilterOptimizer to maximally push down `text_match` filters to Lucene (#12339)
     add 17db0fd17b add ScalingThreadPoolExecutor and use for realtime Lucene thread pool (#12274)
     new 9550545be3 Enhance the minimizeDataMovement to keep the existing pool assignment
     new 97ab0b7d45 Update pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
     new 2a934ef00d Address PR comments
     new 25b4c3a212 Add logic to consider the case when instances are moved across pools
     new a286bd868f Enhance algorithm

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (08efc8c2f1)
            \
             N -- N -- N   refs/heads/maintain-pool-selection-for-minimizeDataMovement (a286bd868f)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .asf.yaml                                          |   7 +
 .github/workflows/scripts/.pinot_quickstart.sh     |  71 +-
 .mvn/extensions.xml                                |   2 +-
 LICENSE-binary                                     |   5 +-
 README.md                                          |   2 -
 compatibility-verifier/compCheck.sh                |   6 +
 .../config/queries/feature-test-1-sql.queries      |   6 +-
 .../queries/feature-test-2-sql-realtime.queries    |   4 +-
 .../query-results/feature-test-1-rest-sql.results  |   6 +-
 .../feature-test-2-sql-realtime.results            |   4 +-
 contrib/pinot-fmpp-maven-plugin/pom.xml            |   2 +-
 .../jmx_prometheus_javaagent/configs/broker.yml    |  26 +-
 .../configs/controller.yml                         |  26 +-
 .../etc/jmx_prometheus_javaagent/configs/pinot.yml |  27 +-
 .../jmx_prometheus_javaagent/configs/server.yml    |  27 +-
 helm/pinot/templates/server/statefulset.yaml       |   4 +-
 helm/pinot/values.yaml                             |  10 +-
 .../broker/api/resources/PinotBrokerDebug.java     |  90 ++-
 .../broker/ZkBasicAuthAccessControlFactory.java    |  26 +-
 .../broker/broker/helix/BaseBrokerStarter.java     |  25 +-
 .../requesthandler/BaseBrokerRequestHandler.java   |  75 +-
 .../requesthandler/GrpcBrokerRequestHandler.java   |  19 +-
 .../MultiStageBrokerRequestHandler.java            |  14 +-
 .../SingleConnectionBrokerRequestHandler.java      |   8 +-
 .../pinot/broker/routing/BrokerRoutingManager.java |  37 +-
 .../AdaptiveServerSelectorFactory.java             |   2 +
 .../instanceselector/BalancedInstanceSelector.java |  31 +-
 .../instanceselector/BaseInstanceSelector.java     |  45 +-
 .../routing/instanceselector/InstanceSelector.java |  21 +-
 .../instanceselector/InstanceSelectorFactory.java  |  30 +-
 .../MultiStageReplicaGroupSelector.java            |  38 +-
 .../ReplicaGroupInstanceSelector.java              |  55 +-
 .../StrictReplicaGroupInstanceSelector.java        |   9 +-
 .../MultiPartitionColumnsSegmentPruner.java        |   5 +-
 .../SinglePartitionColumnSegmentPruner.java        |   5 +-
 .../broker/broker/FakeStreamConsumerFactory.java   |   6 +
 .../broker/broker/HelixBrokerStarterTest.java      |   9 +-
 .../BaseBrokerRequestHandlerTest.java              |   9 +-
 .../instanceselector/InstanceSelectorTest.java     | 108 +--
 .../pinot/client/BrokerCacheUpdaterPeriodic.java   |   8 +-
 .../apache/pinot/client/utils/ConnectionUtils.java |   2 +-
 .../org/apache/pinot/client/PinotConnection.java   |   4 -
 .../org/apache/pinot/client/utils/DriverUtils.java |   2 +-
 pinot-common/pom.xml                               |  22 +-
 .../pinot/common/auth/AuthProviderUtils.java       |   3 +
 .../config/provider/AccessControlUserCache.java    | 243 +++---
 .../pinot/common/datatable/BaseDataTable.java      |   4 +
 .../apache/pinot/common/datatable/DataTable.java   |   5 +-
 .../pinot/common/datatable/DataTableImplV4.java    |   4 +
 .../pinot/common/function/FunctionRegistry.java    |  51 +-
 .../common/function/TransformFunctionType.java     |  10 +-
 .../common/function/scalar/ArrayFunctions.java     |  59 ++
 .../common/function/scalar/StringFunctions.java    |  33 +
 .../common/function/scalar/VectorFunctions.java    |  32 +
 .../apache/pinot/common/http/MultiHttpRequest.java |  51 +-
 .../pinot/common/metrics/AbstractMetrics.java      |   6 +
 .../pinot/common/metrics/ControllerGauge.java      |  10 +-
 .../pinot/common/metrics/ControllerMetrics.java    |  10 +
 .../apache/pinot/common/metrics/MinionMetrics.java |  10 +
 .../apache/pinot/common/metrics/MinionTimer.java   |   2 +-
 .../apache/pinot/common/metrics/ServerMeter.java   |   2 +
 .../apache/pinot/common/metrics/ServerTimer.java   |   5 +-
 .../protocols/SegmentCompletionProtocol.java       |   2 +
 .../apache/pinot/common/request/BrokerRequest.java |   2 +-
 .../apache/pinot/common/request/DataSource.java    |   2 +-
 .../apache/pinot/common/request/Expression.java    |   2 +-
 .../pinot/common/request/ExpressionType.java       |   2 +-
 .../org/apache/pinot/common/request/Function.java  |  38 +-
 .../apache/pinot/common/request/Identifier.java    |   2 +-
 .../pinot/common/request/InstanceRequest.java      | 186 ++++-
 .../java/org/apache/pinot/common/request/Join.java |   2 +-
 .../org/apache/pinot/common/request/JoinType.java  |   2 +-
 .../org/apache/pinot/common/request/Literal.java   | 469 ++++++++++-
 .../apache/pinot/common/request/PinotQuery.java    |   2 +-
 .../apache/pinot/common/request/QuerySource.java   |   2 +-
 .../common/request/context/FilterContext.java      |  73 +-
 .../common/request/context/LiteralContext.java     |  20 +
 .../request/context/RequestContextUtils.java       | 313 +++++---
 .../request/context/predicate/Predicate.java       |   3 +-
 .../predicate/VectorSimilarityPredicate.java       |  82 ++
 .../pinot/common/response/BrokerResponse.java      |   5 +
 .../pinot/common/response/ProcessingException.java |   2 +-
 .../response/broker/BrokerResponseNative.java      |  21 +-
 .../response/broker/BrokerResponseNativeV2.java    |  11 +-
 .../response/broker/BrokerResponseStats.java       |  14 +-
 .../restlet/resources/ValidDocIdMetadataInfo.java  |  56 ++
 .../common/tier/FixedTierSegmentSelector.java      |   2 +-
 .../org/apache/pinot/common/utils/BcryptUtils.java |  53 +-
 .../common/utils/FileUploadDownloadClient.java     |  10 +-
 .../apache/pinot/common/utils/PinotDataType.java   |   8 +-
 .../common/utils/ScalingThreadPoolExecutor.java    | 128 +++
 .../org/apache/pinot/common/utils/TlsUtils.java    | 258 +++++-
 .../common/utils/config/QueryOptionsUtils.java     |  12 +
 .../pinot/common/utils/grpc/GrpcQueryClient.java   |  24 +-
 .../pinot/common/utils/request/RequestUtils.java   |  49 ++
 .../main/java/org/apache/pinot/sql/FilterKind.java |   3 +-
 .../apache/pinot/sql/parsers/CalciteSqlParser.java |  20 +
 .../sql/parsers/rewriter/CLPDecodeRewriter.java    | 177 -----
 .../pinot/sql/parsers/rewriter/ClpRewriter.java    | 633 +++++++++++++++
 .../rewriter/CompileTimeFunctionsInvoker.java      |   9 +-
 .../sql/parsers/rewriter/ExprMinMaxRewriter.java   |  16 +-
 .../rewriter/PredicateComparisonRewriter.java      |  33 +-
 .../apache/pinot/common/data/FieldSpecTest.java    |  45 ++
 .../org/apache/pinot/common/data/SchemaTest.java   |  26 +
 .../function/FunctionDefinitionRegistryTest.java   |   6 +-
 .../pinot/common/http/MultiHttpRequestTest.java    | 106 ++-
 .../segment/ColumnPartitionMetadataTest.java       |   4 +-
 .../pinot/common/utils/FALFInternerTest.java       | 168 ++++
 .../utils/ScalingThreadPoolExecutorTest.java       |  69 ++
 .../apache/pinot/common/utils/TlsUtilsTest.java    | 286 +++++++
 .../common/utils/config/TableConfigSerDeTest.java  |   2 +-
 .../parsers/rewriter/CLPDecodeRewriterTest.java    |  65 --
 .../sql/parsers/rewriter/ClpRewriterTest.java      | 282 +++++++
 .../parsers/rewriter/ExprMinMaxRewriterTest.java   |   2 +-
 .../src/test/resources/tls/keystore-updated.p12    | Bin 0 -> 2581 bytes
 pinot-common/src/test/resources/tls/keystore.p12   | Bin 0 -> 2581 bytes
 .../src/test/resources/tls/truststore-updated.p12  | Bin 0 -> 1186 bytes
 pinot-common/src/test/resources/tls/truststore.p12 | Bin 0 -> 1186 bytes
 pinot-common/src/thrift/query.thrift               |   5 +
 pinot-common/src/thrift/request.thrift             |   1 +
 pinot-connectors/pinot-spark-2-connector/pom.xml   |   2 +-
 pinot-connectors/pinot-spark-3-connector/pom.xml   |   2 +-
 pinot-connectors/pinot-spark-common/pom.xml        |   4 +-
 .../common/reader/PinotServerDataFetcher.scala     |  11 +-
 .../pinot/controller/BaseControllerStarter.java    |   1 +
 .../apache/pinot/controller/ControllerConf.java    |   8 +-
 .../access/ZkBasicAuthAccessControlFactory.java    |  29 +-
 .../resources/LLCSegmentCompletionHandlers.java    |   6 +-
 .../api/resources/PinotQueryResource.java          |   4 +
 .../api/resources/PinotRealtimeTableResource.java  |  52 +-
 .../api/resources/PinotRunningQueryResource.java   |   2 +-
 .../api/resources/PinotTableRestletResource.java   |  71 +-
 .../controller/helix/ControllerRequestClient.java  |  10 +
 .../controller/helix/SegmentStatusChecker.java     |  36 +-
 .../helix/core/PinotHelixResourceManager.java      |  15 +-
 .../helix/core/SegmentDeletionManager.java         |   2 +-
 .../instance/FDAwareInstancePartitionSelector.java |   7 +-
 .../instance/InstancePartitionSelector.java        |  10 +-
 .../InstanceReplicaGroupPartitionSelector.java     | 768 +++++++++---------
 .../instance/InstanceTagPoolSelector.java          |  56 +-
 .../assignment/segment/SegmentAssignmentUtils.java |   8 +-
 .../segment/StrictRealtimeSegmentAssignment.java   | 174 ++--
 .../minion/generator/TaskGeneratorRegistry.java    |   2 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 320 +++++---
 .../segment/CommittingSegmentDescriptor.java       |  10 +
 .../segment/DefaultFlushThresholdUpdater.java      |   4 +-
 .../realtime/segment/FlushThresholdUpdater.java    |   4 +-
 .../segment/SegmentFlushThresholdComputer.java     |  43 +-
 .../SegmentSizeBasedFlushThresholdUpdater.java     |  14 +-
 .../helix/core/rebalance/RebalanceChecker.java     |   2 +-
 .../helix/core/rebalance/RebalanceConfig.java      |  16 +
 .../helix/core/rebalance/TableRebalancer.java      | 126 +--
 .../rebalance/ZkBasedTableRebalanceObserver.java   |   8 +
 .../helix/core/relocation/SegmentRelocator.java    |   2 +-
 .../recommender/data/DataGenerationHelpers.java    | 138 ++++
 .../recommender/data/generator/DataGenerator.java  | 104 +--
 .../data/generator/DataGeneratorSpec.java          | 116 ++-
 .../data/generator/DateTimeGenerator.java          |  51 ++
 .../data/generator/PatternSeasonalGenerator.java   |   2 +-
 .../data/generator/PatternSequenceGenerator.java   |   2 +-
 .../data/generator/PatternSpikeGenerator.java      |   2 +-
 .../data/generator/PatternStringGenerator.java     |   2 +-
 .../data/{generator => writer}/AvroWriter.java     |  72 +-
 .../recommender/data/writer/AvroWriterSpec.java    |  41 +-
 .../recommender/data/writer/CsvWriter.java         |  66 ++
 .../recommender/data/writer/FileWriter.java        |  81 ++
 .../recommender/data/writer/FileWriterSpec.java    |  35 +-
 .../recommender/data/writer/JsonWriter.java        |  33 +-
 .../controller/recommender/data/writer/Writer.java |  55 +-
 .../recommender/data/writer/WriterSpec.java        |  18 +-
 .../controller/recommender/io/InputManager.java    |   8 +-
 .../realtime/provisioning/MemoryEstimator.java     |  15 +-
 .../recommender/rules/impl/BloomFilterRule.java    |   7 +-
 .../recommender/rules/impl/FlagQueryRule.java      |   6 +-
 .../recommender/rules/impl/JsonIndexRule.java      |   2 +-
 .../NoDictionaryOnHeapDictionaryJointRule.java     |   5 +-
 .../rules/impl/PinotTablePartitionRule.java        |   7 +-
 .../recommender/rules/impl/RangeIndexRule.java     |   5 +-
 .../utils/QueryInvertedSortedIndexRecommender.java |   7 +-
 .../controller/util/CompletionServiceHelper.java   |  52 +-
 .../util/ServerSegmentMetadataReader.java          | 161 +++-
 .../pinot/controller/util/TableMetadataReader.java |  76 +-
 .../app/components/AsyncInstanceTable.tsx          | 122 +++
 .../resources/app/components/AsyncPinotSchemas.tsx |  76 ++
 .../resources/app/components/AsyncPinotTables.tsx  | 220 +++++
 .../app/components/Homepage/InstancesTables.tsx    |  34 +-
 .../Homepage/Operations/RebalanceServerTableOp.tsx |  15 +-
 .../app/components/Homepage/TenantsListing.tsx     |  65 +-
 .../src/main/resources/app/components/Loading.tsx  |  13 +-
 .../src/main/resources/app/components/NotFound.tsx |  69 ++
 .../src/main/resources/app/components/Table.tsx    |   2 +-
 .../src/main/resources/app/interfaces/types.d.ts   |  37 +-
 .../src/main/resources/app/pages/HomePage.tsx      | 126 ++-
 .../main/resources/app/pages/InstanceDetails.tsx   | 472 ++++++-----
 .../resources/app/pages/InstanceListingPage.tsx    |  11 +-
 .../src/main/resources/app/pages/Query.tsx         |  10 +-
 .../main/resources/app/pages/SchemaPageDetails.tsx | 323 ++++----
 .../main/resources/app/pages/SegmentDetails.tsx    | 440 +++++++---
 .../main/resources/app/pages/TablesListingPage.tsx | 112 +--
 .../src/main/resources/app/pages/TaskQueue.tsx     |   2 +-
 .../src/main/resources/app/pages/TenantDetails.tsx | 567 +++++++------
 .../src/main/resources/app/pages/Tenants.tsx       | 107 +--
 .../resources/app/pages/TenantsListingPage.tsx     |  31 +-
 .../src/main/resources/app/requests/index.ts       |  34 +-
 pinot-controller/src/main/resources/app/router.tsx |   1 +
 .../main/resources/app/utils/PinotMethodUtils.ts   |  40 +-
 .../src/main/resources/app/utils/Utils.tsx         |  73 +-
 .../api/PinotTenantRestletResourceTest.java        | 247 +++---
 .../pinot/controller/helix/ControllerTest.java     |   3 +-
 .../PinotHelixResourceManagerStatelessTest.java    |   4 +-
 .../instance/InstanceAssignmentTest.java           |  43 +-
 .../InstanceReplicaGroupPartitionSelectorTest.java | 141 ++--
 .../PinotLLCRealtimeSegmentManagerTest.java        |  15 +
 .../segment/FlushThresholdUpdaterTest.java         | 108 +--
 .../segment/SegmentFlushThresholdComputerTest.java |  69 +-
 .../helix/core/rebalance/TableRebalancerTest.java  | 885 +++++++++++++++++++--
 .../TestZkBasedTableRebalanceObserver.java         |   3 +
 pinot-core/pom.xml                                 |  26 +-
 .../apache/pinot/core/common/MinionConstants.java  |   1 +
 .../apache/pinot/core/common/ObjectSerDeUtils.java |  43 +-
 .../core/data/manager/BaseTableDataManager.java    | 125 +--
 .../core/data/manager/InstanceDataManager.java     |   2 +-
 .../manager/offline/DimensionTableDataManager.java |   9 +-
 .../manager/offline/TableDataManagerProvider.java  |  52 +-
 .../manager/realtime/IngestionDelayTracker.java    |  63 +-
 .../realtime/RealtimeConsumptionRateManager.java   |  39 +-
 .../realtime/RealtimeSegmentDataManager.java       |  73 +-
 .../manager/realtime/RealtimeTableDataManager.java |  72 +-
 .../realtime/SegmentBuildTimeLeaseExtender.java    |   4 +-
 .../core/function/scalar/SketchFunctions.java      |  11 +
 .../blocks/results/GroupByResultsBlock.java        |   6 +
 .../filter/VectorSimilarityFilterOperator.java     | 118 +++
 .../core/operator/query/AggregationOperator.java   |  10 +-
 .../query/FilteredAggregationOperator.java         |  27 +-
 .../operator/query/FilteredGroupByOperator.java    |  42 +-
 .../pinot/core/operator/query/GroupByOperator.java |  19 +-
 .../function/ArrayLiteralTransformFunction.java    |  72 ++
 .../transform/function/CastTransformFunction.java  |  17 +
 .../ClpEncodedVarsMatchTransformFunction.java      | 150 ++++
 .../function/TransformFunctionFactory.java         |   9 +-
 .../pinot/core/plan/AggregationPlanNode.java       |  63 +-
 .../org/apache/pinot/core/plan/FilterPlanNode.java |  18 +-
 .../apache/pinot/core/plan/GroupByPlanNode.java    |  59 +-
 .../function/AggregationFunctionUtils.java         | 169 ++--
 ...inctCountRawThetaSketchAggregationFunction.java |  18 +-
 ...istinctCountThetaSketchAggregationFunction.java | 193 +++--
 .../function/HistogramAggregationFunction.java     |  33 +-
 .../core/query/config/QueryExecutorConfig.java     |   2 +-
 .../pinot/core/query/executor/QueryExecutor.java   |   2 +-
 .../query/executor/ServerQueryExecutorV1Impl.java  |  33 +-
 .../pinot/core/query/logger/ServerQueryLogger.java |   2 +-
 .../pinot/core/query/optimizer/QueryOptimizer.java |   8 +-
 .../optimizer/filter/NumericalFilterOptimizer.java |  24 +-
 .../optimizer/filter/TextMatchFilterOptimizer.java | 186 +++++
 .../query/pruner/ColumnValueSegmentPruner.java     |   5 +-
 .../core/query/pruner/ValueBasedSegmentPruner.java |  11 +-
 .../pinot/core/query/reduce/BaseReduceService.java |   3 +
 .../core/query/reduce/BrokerReduceService.java     |  14 +-
 .../core/query/reduce/DataTableReducerContext.java |   8 +-
 .../query/reduce/ExecutionStatsAggregator.java     |   6 +
 .../core/query/reduce/GroupByDataTableReducer.java |   3 +-
 .../core/query/reduce/StreamingReduceService.java  |  13 +-
 .../core/query/request/ServerQueryRequest.java     |   8 +
 .../context/utils/QueryContextConverterUtils.java  |   8 +
 .../pinot/core/query/scheduler/QueryScheduler.java |  12 +
 .../query/selection/SelectionOperatorUtils.java    |   5 +
 .../apache/pinot/core/routing/RoutingTable.java    |  13 +-
 .../processing/framework/SegmentConfig.java        |  30 +-
 .../framework/SegmentProcessorFramework.java       | 139 +++-
 .../genericrow/AdaptiveConstraintsWriter.java      |  17 +-
 .../genericrow/AdaptiveSizeBasedWriter.java        |  33 +-
 .../segment/processing/genericrow/FileWriter.java  |  14 +-
 .../genericrow/GenericRowFileWriter.java           |  11 +-
 .../segment/processing/mapper/SegmentMapper.java   |  70 +-
 .../partitioner/TableConfigPartitioner.java        |   3 +-
 .../apache/pinot/core/startree/StarTreeUtils.java  |  55 +-
 .../executor/StarTreeAggregationExecutor.java      |   2 +-
 .../startree/executor/StarTreeGroupByExecutor.java |  18 +-
 .../pinot/core/transport/DirectOOMHandler.java     |   2 +-
 .../apache/pinot/core/transport/QueryRouter.java   |  24 +-
 .../pinot/core/transport/grpc/GrpcQueryServer.java |  12 +-
 .../apache/pinot/core/util/ListenerConfigUtil.java |   2 +-
 .../java/org/apache/pinot/core/util/OsCheck.java   |  18 +-
 .../server/access/ZkBasicAuthAccessFactory.java    |  26 +-
 .../accounting/ResourceManagerAccountingTest.java  | 145 ++++
 .../pinot/core/common/ObjectSerDeUtilsTest.java    |  51 ++
 .../function/InbuiltFunctionEvaluatorTest.java     |   2 +-
 .../BaseTableDataManagerAcquireSegmentTest.java    |  40 +-
 .../data/manager/BaseTableDataManagerTest.java     |  90 +--
 .../offline/DimensionTableDataManagerTest.java     |  95 ++-
 .../RealtimeConsumptionRateManagerTest.java        |  49 +-
 .../realtime/RealtimeSegmentDataManagerTest.java   |  73 +-
 .../realtime/RealtimeTableDataManagerTest.java     |  77 +-
 .../core/function/scalar/SketchFunctionsTest.java  |  22 +
 .../combine/SelectionCombineOperatorTest.java      |  50 +-
 ...ionTest.java => ClpTransformFunctionsTest.java} | 111 ++-
 ...adataAndDictionaryAggregationPlanMakerTest.java |  18 +-
 .../executor/QueryExecutorExceptionsTest.java      |  34 +-
 .../core/query/executor/QueryExecutorTest.java     |  54 +-
 .../core/query/optimizer/QueryOptimizerTest.java   |  55 ++
 .../BrokerRequestToQueryContextConverterTest.java  |  89 ++-
 .../query/scheduler/PrioritySchedulerTest.java     |   2 +-
 .../fakestream/FakeStreamMetadataProvider.java     |   8 +
 .../core/realtime/stream/StreamConfigTest.java     |  27 +
 .../framework/SegmentProcessorFrameworkTest.java   | 205 ++++-
 .../pinot/core/startree/v2/BaseStarTreeV2Test.java |  35 +-
 .../v2/DistinctCountRawHLLStarTreeV2Test.java      |  58 ++
 .../pinot/core/transport/QueryRoutingTest.java     |   5 +-
 .../stats/ServerRoutingStatsManagerTest.java       |  90 ++-
 .../DistinctCountThetaSketchQueriesTest.java       |  32 +-
 .../pinot/queries/ExplainPlanQueriesTest.java      |  49 +-
 .../apache/pinot/queries/HistogramQueriesTest.java |   4 +-
 .../pinot/queries/JsonMalformedIndexTest.java      | 131 +++
 .../org/apache/pinot/queries/QueriesTestUtils.java |  25 +
 .../queries/SegmentWithNullValueVectorTest.java    |  60 +-
 .../src/test/resources/data/dimBaseballTeams.avro  | Bin 2214 -> 0 bytes
 .../src/test/resources/data/dimBaseballTeams.avsc  |   8 -
 .../src/test/resources/data/dimBaseballTeams.csv   |   1 +
 .../resources/data/dimBaseballTeams_config.json    |   0
 pinot-distribution/pom.xml                         |  14 +-
 pinot-integration-test-base/pom.xml                |   4 +
 .../tests/BaseClusterIntegrationTest.java          |  20 +-
 .../pinot/integration/tests/QueryGenerator.java    |   3 +-
 pinot-integration-tests/pom.xml                    |   4 +
 .../tests/BaseRealtimeClusterIntegrationTest.java  |   6 +
 .../tests/HybridClusterIntegrationTest.java        |  78 +-
 .../tests/LLCRealtimeClusterIntegrationTest.java   | 149 ++++
 .../tests/OfflineClusterIntegrationTest.java       | 125 ++-
 ...flineClusterMemBasedServerQueryKillingTest.java |  14 +
 .../tests/OfflineGRPCServerIntegrationTest.java    |   2 +-
 ...PartialUpsertTableRebalanceIntegrationTest.java |   9 +-
 ...nsumptionRateLimiterClusterIntegrationTest.java | 298 +++++++
 ...gmentGenerationMinionRealtimeIngestionTest.java | 167 ++++
 ...PartitionLLCRealtimeClusterIntegrationTest.java |   2 +-
 .../tests/StarTreeClusterIntegrationTest.java      |  52 +-
 ...sertCompactionMinionClusterIntegrationTest.java | 221 -----
 .../tests/UpsertTableIntegrationTest.java          | 398 +++++++--
 .../UpsertTableSegmentPreloadIntegrationTest.java  |  11 +-
 .../pinot/integration/tests/custom/ArrayTest.java  | 196 ++++-
 .../integration/tests/custom/ThetaSketchTest.java  |  11 +
 .../integration/tests/custom/TimestampTest.java    |  54 +-
 .../models/DummyTableUpsertMetadataManager.java    | 115 +++
 .../minion/tasks/TestTaskExecutorFactory.java      |   3 +-
 .../tests/startree/StarTreeQueryGenerator.java     |  17 +-
 .../src/test/resources/gameScores_large_csv.tar.gz | Bin 0 -> 12810 bytes
 .../src/test/resources/test_null_handling.schema   |   1 -
 .../test/resources/upsert_compaction_test.tar.gz   | Bin 9405 -> 0 bytes
 .../org/apache/pinot/minion/BaseMinionStarter.java |   1 +
 .../org/apache/pinot/minion/MinionContext.java     |   2 +
 .../minion/taskfactory/TaskFactoryRegistry.java    |   2 +-
 .../org/apache/pinot/minion/MinionConfTest.java    |  15 +-
 .../batch/hadoop/HadoopSegmentCreationMapper.java  |   2 +-
 pinot-plugins/pinot-file-system/pinot-adls/pom.xml |   4 +-
 pinot-plugins/pinot-file-system/pinot-gcs/pom.xml  |   4 -
 pinot-plugins/pinot-file-system/pinot-s3/pom.xml   |   6 +-
 .../apache/pinot/plugin/filesystem/S3PinotFS.java  |   2 +-
 .../inputformat/clplog/CLPLogRecordExtractor.java  |   8 +-
 .../clplog/CLPLogRecordExtractorTest.java          |  14 +-
 .../parquet/ParquetRecordReaderConfig.java         |   2 +-
 .../tasks/BaseSingleSegmentConversionExecutor.java |   4 +-
 .../pinot/plugin/minion/tasks/MergeTaskUtils.java  |   4 +
 .../pinot/plugin/minion/tasks/MinionTaskUtils.java |  41 +
 .../mergerollup/MergeRollupTaskGenerator.java      |   4 +-
 .../minion/tasks/purge/PurgeTaskExecutor.java      |  19 +-
 .../minion/tasks/purge/PurgeTaskGenerator.java     |   2 +-
 .../RealtimeToOfflineSegmentsTaskGenerator.java    |   6 +-
 .../SegmentGenerationAndPushTaskGenerator.java     |  41 +-
 .../UpsertCompactionTaskExecutor.java              | 141 +---
 .../UpsertCompactionTaskGenerator.java             | 120 +--
 .../plugin/minion/tasks/MergeTaskUtilsTest.java    |   6 +-
 .../minion/tasks/purge/PurgeTaskExecutorTest.java  |   3 +-
 .../UpsertCompactionTaskExecutorTest.java          |   7 +-
 .../UpsertCompactionTaskGeneratorTest.java         |  86 +-
 .../pinot-stream-ingestion/pinot-kafka-2.0/pom.xml |  13 +
 .../KafkaPartitionLevelConnectionHandler.java      |   1 +
 .../pinot/plugin/stream/kafka20/KafkaSSLUtils.java | 339 ++++++++
 .../kafka20/KafkaStreamMetadataProvider.java       |  19 +
 .../plugin/stream/kafka20/KafkaSSLUtilsTest.java   | 310 ++++++++
 .../stream/kinesis/KinesisConnectionHandler.java   |   2 +-
 .../stream/kinesis/server/KinesisDataProducer.java |   2 +-
 .../pinot-stream-ingestion/pinot-pulsar/pom.xml    |  12 +-
 .../pinot/plugin/stream/pulsar/PulsarConfig.java   |  89 ++-
 .../PulsarPartitionLevelConnectionHandler.java     |  26 +
 .../pulsar/PulsarStreamMetadataProvider.java       |  19 +-
 .../plugin/stream/pulsar/PulsarConfigTest.java     |  58 ++
 .../apache/calcite/rel/hint/PinotHintOptions.java  |   5 +
 .../rel/rules/PinotEvaluateLiteralRule.java        |  53 +-
 .../rel/rules/PinotJoinToDynamicBroadcastRule.java |  24 +-
 .../calcite/rel/rules/PinotQueryRuleSets.java      |  22 +-
 .../rel/rules/PinotRelDistributionTraitRule.java   | 177 +++++
 .../apache/calcite/rel/rules/PinotRuleUtils.java   |  48 +-
 .../rel/rules/PinotSortExchangeCopyRule.java       |   6 +
 .../org/apache/pinot/query/QueryEnvironment.java   | 109 ++-
 .../apache/pinot/query/context/PlannerContext.java |  13 +-
 .../query/parser/CalciteRexExpressionParser.java   |   8 +-
 .../apache/pinot/query/planner/PlanFragment.java   |   9 +-
 .../{ => explain}/PhysicalExplainPlanVisitor.java  |  83 +-
 .../query/planner/logical/LogicalPlanner.java      |   6 +-
 .../planner/logical/PinotLogicalQueryPlanner.java  |  11 +-
 .../query/planner/logical/PlanFragmenter.java      |   6 +-
 .../planner/logical/RelToPlanNodeConverter.java    |   7 +-
 .../planner/physical/DispatchablePlanContext.java  |   1 -
 .../{ => physical}/DispatchablePlanFragment.java   |   3 +-
 .../planner/physical/DispatchablePlanMetadata.java |  43 +-
 .../planner/physical/DispatchablePlanVisitor.java  |   3 +-
 .../{ => physical}/DispatchableSubPlan.java        |   2 +-
 .../planner/physical/MailboxAssignmentVisitor.java |  31 +-
 .../planner/physical/PinotDispatchPlanner.java     |   3 -
 .../query/planner/plannode/AggregateNode.java      |   2 +-
 .../pinot/query/planner/plannode/ExchangeNode.java |  10 +-
 .../query/planner/plannode/MailboxReceiveNode.java |   6 +-
 .../query/planner/plannode/MailboxSendNode.java    |  21 +-
 .../query/planner/plannode/PlanNodeVisitor.java    |   5 +-
 .../pinot/query/planner/plannode/WindowNode.java   |   2 +-
 .../apache/pinot/query/routing/WorkerManager.java  | 426 +++++-----
 .../org/apache/pinot/query/type/TypeFactory.java   |  71 +-
 .../rel/rules/PinotSortExchangeCopyRuleTest.java   |  12 +-
 .../apache/pinot/query/QueryCompilationTest.java   |  69 +-
 .../pinot/query/QueryEnvironmentTestBase.java      |  23 +-
 .../query/planner/plannode/SerDeUtilsTest.java     |   4 +-
 .../query/queries/ResourceBasedQueryPlansTest.java |  26 +-
 .../query/testutils/MockRoutingManagerFactory.java |  12 +-
 .../apache/pinot/query/type/TypeFactoryTest.java   | 221 ++++-
 .../src/test/resources/queries/AggregatePlans.json |  49 +-
 .../test/resources/queries/BasicQueryPlans.json    |  10 +-
 .../resources/queries/ExplainPhysicalPlans.json    | 455 +++++++++++
 .../src/test/resources/queries/GroupByPlans.json   |  61 +-
 .../src/test/resources/queries/JoinPlans.json      | 200 +++--
 .../resources/queries/LiteralEvaluationPlans.json  | 114 ++-
 .../src/test/resources/queries/OrderByPlans.json   |  81 +-
 .../test/resources/queries/PinotHintablePlans.json | 427 +++++++++-
 .../src/test/resources/queries/SetOpPlans.json     |   2 +-
 .../resources/queries/ValidationErrorPlan.json     |   8 +-
 .../resources/queries/WindowFunctionPlans.json     | 716 ++++++++---------
 .../query/mailbox/InMemorySendingMailbox.java      |   2 +-
 .../apache/pinot/query/runtime/QueryRunner.java    |  31 +-
 .../query/runtime/operator/AggregateOperator.java  |  35 +-
 .../query/runtime/operator/HashJoinOperator.java   |  59 +-
 .../LeafStageTransferableBlockOperator.java        |  37 +-
 .../runtime/operator/LiteralValueOperator.java     |   3 +-
 .../query/runtime/operator/MultiStageOperator.java |  16 +-
 .../query/runtime/operator/OperatorStats.java      |   2 -
 .../pinot/query/runtime/operator/SetOperator.java  |  18 +-
 .../pinot/query/runtime/operator/SortOperator.java | 107 ++-
 .../query/runtime/operator/TransformOperator.java  |  24 +-
 .../runtime/operator/WindowAggregateOperator.java  |  21 +-
 .../runtime/operator/operands/FunctionOperand.java |  21 +-
 .../runtime/operator/utils/OperatorUtils.java      |   2 +-
 .../query/runtime/operator/utils/TypeUtils.java    |   8 +
 .../runtime/plan/OpChainExecutionContext.java      |  11 +
 .../query/runtime/plan/PhysicalPlanVisitor.java    |  31 +-
 .../runtime/plan/serde/QueryPlanSerDeUtils.java    |   4 +-
 .../plan/server/ServerPlanRequestContext.java      |  68 +-
 .../plan/server/ServerPlanRequestUtils.java        | 157 ++--
 .../plan/server/ServerPlanRequestVisitor.java      | 135 ++--
 .../query/service/dispatch/QueryDispatcher.java    |   4 +-
 .../pinot/query/service/server/QueryServer.java    |   3 +-
 .../runtime/operator/AggregateOperatorTest.java    |  50 ++
 .../runtime/operator/HashJoinOperatorTest.java     |  16 +-
 .../query/runtime/queries/QueryRunnerTest.java     |  36 +-
 .../query/runtime/queries/QueryRunnerTestBase.java |  62 +-
 .../runtime/queries/ResourceBasedQueriesTest.java  |  14 +-
 .../service/dispatch/QueryDispatcherTest.java      |   2 +-
 .../query/service/server/QueryServerTest.java      |   4 +-
 .../testutils/MockInstanceDataManagerFactory.java  |  12 +-
 .../src/test/resources/queries/BasicQuery.json     |   4 +
 .../src/test/resources/queries/CharacterTypes.json |   2 -
 .../src/test/resources/queries/CountDistinct.json  |   8 +-
 .../test/resources/queries/FromExpressions.json    |  30 +
 .../test/resources/queries/MetadataTestQuery.json  |   2 +-
 .../src/test/resources/queries/NullHandling.json   | 338 +++++++-
 .../src/test/resources/queries/QueryHints.json     | 391 ++++++++-
 .../test/resources/queries/StringFunctions.json    |   2 +
 .../src/test/resources/queries/WithStatements.json |  21 +
 pinot-segment-local/pom.xml                        |  13 +-
 .../DistinctCountHLLPlusValueAggregator.java       |   2 +-
 .../local/customobject/ThetaSketchAccumulator.java | 135 ++++
 .../local/data/manager/TableDataManager.java       |  34 +-
 .../local/data/manager/TableDataManagerConfig.java | 113 ---
 .../local/data/manager/TableDataManagerParams.java |  66 --
 .../local/function/InbuiltFunctionEvaluator.java   |   6 +
 .../immutable/ImmutableSegmentImpl.java            |  18 +-
 .../immutable/ImmutableSegmentLoader.java          |  21 +-
 .../indexsegment/mutable/MutableSegmentImpl.java   |  30 +-
 .../local/io/util/FixedBitIntReaderWriter.java     |   8 +
 .../segment/local/io/util/PinotDataBitSet.java     |   6 +
 .../FixedBitMVEntryDictForwardIndexWriter.java     | 126 +++
 .../local/realtime/impl/RealtimeSegmentConfig.java |  28 +-
 .../invertedindex/RealtimeLuceneTextIndex.java     |  15 +-
 .../RealtimeLuceneTextIndexSearcherPool.java       |   6 +-
 .../realtime/impl/json/MutableJsonIndexImpl.java   |   1 +
 .../realtime/impl/vector/MutableVectorIndex.java   | 154 ++++
 .../recordtransformer/CompositeTransformer.java    |  16 +-
 .../recordtransformer/SpecialValueTransformer.java | 128 +++
 .../creator/impl/ColumnJsonParserException.java    |  48 ++
 .../creator/impl/SegmentColumnarIndexCreator.java  |  56 +-
 .../MultiValueEntryDictForwardIndexCreator.java    |  67 ++
 .../creator/impl/inv/RangeIndexCreator.java        |   5 +-
 .../impl/inv/text/LuceneFSTIndexCreator.java       |   7 +-
 .../stats/AbstractColumnStatisticsCollector.java   |  13 +-
 .../BigDecimalColumnPreIndexStatsCollector.java    |   4 +-
 .../stats/BytesColumnPredIndexStatsCollector.java  |   4 +-
 .../stats/DoubleColumnPreIndexStatsCollector.java  |   4 +-
 .../stats/FloatColumnPreIndexStatsCollector.java   |   4 +-
 .../stats/IntColumnPreIndexStatsCollector.java     |   4 +-
 .../stats/LongColumnPreIndexStatsCollector.java    |   4 +-
 .../stats/StringColumnPreIndexStatsCollector.java  |   4 +-
 .../creator/impl/text/LuceneTextIndexCreator.java  |  36 +-
 .../impl/vector/HnswVectorIndexCreator.java        | 120 +++
 .../creator/impl/vector/XKnnFloatVectorField.java  |  68 ++
 .../creator/impl/vector/lucene95/HnswCodec.java    | 139 ++++
 .../impl/vector/lucene95/HnswVectorsFormat.java    |  91 +++
 .../converter/SegmentV1V2ToV3FormatConverter.java  |  45 +-
 .../segment/index/datasource/BaseDataSource.java   |   7 +
 .../index/dictionary/DictionaryIndexType.java      |  55 +-
 .../index/dictionary/DictionaryInternerHolder.java |  58 ++
 .../index/forward/ForwardIndexCreatorFactory.java  |  10 +-
 .../index/forward/ForwardIndexReaderFactory.java   |  11 +-
 .../segment/index/forward/ForwardIndexType.java    |  78 +-
 .../local/segment/index/fst/FstIndexType.java      |   5 +-
 .../segment/index/loader/ForwardIndexHandler.java  | 284 ++++---
 .../segment/index/loader/IndexLoadingConfig.java   |  44 +-
 .../local/segment/index/loader/LoaderUtils.java    |   2 +-
 .../ColumnMinMaxValueGenerator.java                |   4 +-
 .../defaultcolumn/BaseDefaultColumnHandler.java    |  39 +-
 .../loader/invertedindex/FSTIndexHandler.java      |   4 +-
 .../loader/invertedindex/VectorIndexHandler.java   | 215 +++++
 .../index/nullvalue/NullValueIndexType.java        |  41 +-
 .../index/readers/LuceneFSTIndexReader.java        |   3 +-
 .../index/readers/OnHeapStringDictionary.java      |  17 +-
 .../segment/index/readers/StringDictionary.java    |  32 +
 .../FixedBitMVEntryDictForwardIndexReader.java     | 155 ++++
 .../readers/json/ImmutableJsonIndexReader.java     |   1 +
 .../index/readers/text/LuceneTextIndexReader.java  |  25 +-
 .../index/readers/vector/HnswDocIdCollector.java   |  78 ++
 .../HnswVectorIndexReader.java}                    |  97 +--
 .../segment/index/text/TextIndexConfigBuilder.java |   3 +
 .../local/segment/index/text/TextIndexType.java    |   6 +-
 .../segment/index/vector/VectorIndexPlugin.java    |  17 +-
 .../segment/index/vector/VectorIndexType.java      | 174 ++++
 .../readers/CompactedPinotSegmentRecordReader.java | 111 +++
 .../segment/readers/PinotSegmentColumnReader.java  |  38 +-
 .../local/segment/store/FilePerIndexDirectory.java |   2 +
 .../segment/store/SegmentLocalFSDirectory.java     |  10 +-
 .../segment/store/SingleFileIndexDirectory.java    |  11 +-
 .../local/segment/store/StarTreeIndexReader.java   |  34 +-
 .../local/segment/store/TextIndexUtils.java        |  35 +-
 .../local/segment/store/VectorIndexUtils.java      |  98 +++
 .../local/startree/StarTreeBuilderUtils.java       |   4 +-
 .../startree/v2/builder/BaseSingleTreeBuilder.java |   2 +-
 .../startree/v2/builder/MultipleTreesBuilder.java  |  17 +-
 .../v2/builder/OffHeapSingleTreeBuilder.java       |   2 +-
 .../v2/builder/OnHeapSingleTreeBuilder.java        |   2 +-
 .../v2/builder/StarTreeIndexSeparator.java         |   7 +-
 .../v2/builder/StarTreeV2BuilderConfig.java        |  14 +-
 .../startree/v2/store/StarTreeIndexMapUtils.java   |  34 +-
 .../upsert/BasePartitionUpsertMetadataManager.java | 157 ++--
 .../upsert/BaseTableUpsertMetadataManager.java     | 133 ++--
 ...oncurrentMapPartitionUpsertMetadataManager.java |  78 +-
 .../ConcurrentMapTableUpsertMetadataManager.java   |   4 +-
 .../local/upsert/TableUpsertMetadataManager.java   |   6 +-
 .../upsert/TableUpsertMetadataManagerFactory.java  |  29 +-
 .../pinot/segment/local/upsert/UpsertContext.java  | 197 +++++
 .../local/utils/ConsistentDataPushUtils.java       |   2 +-
 .../segment/local/utils/CustomSerDeUtils.java      |   9 +-
 .../pinot/segment/local/utils/SegmentLocks.java    |  31 +-
 .../segment/local/utils/SegmentPushUtils.java      |   2 +-
 .../segment/local/utils/TableConfigUtils.java      | 164 ++--
 .../pinot/segment/local/utils/fst/FSTBuilder.java  |  19 +-
 .../segment/local/utils/fst/RegexpMatcher.java     |  11 +-
 .../utils/nativefst/NativeFSTIndexCreator.java     |   2 +-
 ...istinctCountThetaSketchValueAggregatorTest.java |  13 +
 .../customobject/ThetaSketchAccumulatorTest.java   | 104 +++
 .../mutable/MutableSegmentImplTestUtils.java       |   6 +-
 .../MutableSegmentImplUpsertComparisonColTest.java | 110 ++-
 .../mutable/MutableSegmentImplUpsertTest.java      |  29 +-
 .../invertedindex/LuceneMutableTextIndexTest.java  |   5 +-
 .../NativeAndLuceneMutableTextIndexTest.java       |   7 +-
 .../recordtransformer/RecordTransformerTest.java   | 127 ++-
 .../impl/SegmentColumnarIndexCreatorTest.java      |  13 +-
 .../segment/index/SegmentMetadataImplTest.java     |   3 +-
 .../index/creator/LuceneFSTIndexCreatorTest.java   |   4 +-
 .../index/creator/NativeFSTIndexCreatorTest.java   |   4 +-
 .../index/dictionary/DictionaryIndexTypeTest.java  |  70 +-
 .../FixedBitMVEntryDictForwardIndexTest.java       | 123 +++
 .../index/forward/ForwardIndexTypeTest.java        |  56 +-
 .../index/loader/ForwardIndexHandlerTest.java      | 160 +++-
 .../local/segment/index/loader/LoaderTest.java     | 208 ++++-
 .../index/loader/SegmentPreProcessorTest.java      |  45 +-
 .../index/nullvalue/NullValueIndexTypeTest.java    |  66 ++
 .../index/readers/ImmutableDictionaryTest.java     |   2 +-
 .../ImmutableDictionaryTypeConversionTest.java     |  25 +-
 .../text/LuceneTextIndexCompatibleTest.java        |  32 +-
 .../CompactedPinotSegmentRecordReaderTest.java     | 131 +++
 .../segment/store/FilePerIndexDirectoryTest.java   |  24 +-
 .../store/SingleFileIndexDirectoryTest.java        |  26 +-
 .../segment/store/StarTreeIndexReaderTest.java     |   3 +-
 .../v2/builder/StarTreeIndexSeparatorTest.java     |   5 +-
 .../v2/builder/StarTreeV2BuilderConfigTest.java    |  57 +-
 .../BasePartitionUpsertMetadataManagerTest.java    | 174 ++++
 ...rrentMapPartitionUpsertMetadataManagerTest.java | 252 ++++--
 ...oncurrentMapTableUpsertMetadataManagerTest.java |  27 +-
 .../segment/local/utils/TableConfigUtilsTest.java  | 321 +++++++-
 .../segment/local/utils/fst/FSTBuilderTest.java    |   4 +-
 .../data/lucene_80_index/Text.lucene.index/_0.cfe  | Bin 0 -> 299 bytes
 .../data/lucene_80_index/Text.lucene.index/_0.cfs  | Bin 0 -> 136123 bytes
 .../data/lucene_80_index/Text.lucene.index/_0.si   | Bin 0 -> 370 bytes
 .../lucene_80_index/Text.lucene.index/segments_1   | Bin 0 -> 137 bytes
 .../lucene_80_index/Text.lucene.index/write.lock   |   0
 .../data/test_upsert_comparison_col_schema.json    |   4 +
 .../src/test/resources/data/test_vector_data.avro  | Bin 0 -> 6219422 bytes
 .../org/apache/pinot/segment/spi/V1Constants.java  |   7 +-
 .../spi/compression/DictIdCompressionType.java     |  35 +-
 .../pinot/segment/spi/creator/SegmentCreator.java  |   2 +-
 .../spi/creator/SegmentGeneratorConfig.java        |   2 +-
 .../pinot/segment/spi/datasource/DataSource.java   |   7 +
 .../segment/spi/index/DictionaryIndexConfig.java   |  37 +-
 .../segment/spi/index/ForwardIndexConfig.java      |  81 +-
 .../pinot/segment/spi/index/StandardIndexes.java   |   9 +
 .../pinot/segment/spi/index/TextIndexConfig.java   |  31 +-
 .../spi/index/creator/VectorIndexConfig.java       | 126 +++
 .../spi/index/creator/VectorIndexCreator.java      |  33 +-
 .../spi/index/metadata/ColumnMetadataImpl.java     |   4 +-
 .../spi/index/metadata/SegmentMetadataImpl.java    |  12 +-
 .../pinot/segment/spi/index/reader/Dictionary.java |  24 +
 .../spi/index/reader/ForwardIndexReader.java       |  11 +-
 .../spi/index/reader/VectorIndexReader.java        |  20 +-
 .../startree/AggregationFunctionColumnPair.java    |  42 +
 .../spi/index/startree/AggregationSpec.java        |   8 +
 .../spi/index/startree/StarTreeV2Metadata.java     |  16 +-
 .../pinot/segment/spi/memory/PinotByteBuffer.java  |   6 +-
 .../BoundedColumnValuePartitionFunction.java       |   4 +-
 .../spi/partition/ByteArrayPartitionFunction.java  |   4 +-
 .../spi/partition/HashCodePartitionFunction.java   |   4 +-
 .../spi/partition/ModuloPartitionFunction.java     |  15 +-
 .../spi/partition/Murmur3PartitionFunction.java    | 289 +++++++
 .../spi/partition/MurmurPartitionFunction.java     |   6 +-
 .../segment/spi/partition/PartitionFunction.java   |   3 +-
 .../spi/partition/PartitionFunctionFactory.java    |   8 +-
 .../pinot/segment/spi/store/SegmentDirectory.java  |   2 +-
 .../segment/spi/store/SegmentDirectoryPaths.java   |  25 +-
 .../segment/spi/utils/SegmentMetadataUtils.java    |  17 +-
 .../AggregationFunctionColumnPairTest.java         |  59 ++
 .../spi/index/startree/StarTreeV2MetadataTest.java | 130 +++
 .../spi/partition/PartitionFunctionTest.java       | 307 ++++++-
 pinot-server/pom.xml                               |  14 -
 .../pinot/server/api/AdminApiApplication.java      |  10 +-
 .../pinot/server/api/resources/TablesResource.java |  88 +-
 .../server/starter/helix/BaseServerStarter.java    |   5 +-
 .../starter/helix/HelixInstanceDataManager.java    |  23 +-
 .../helix/HelixInstanceDataManagerConfig.java      | 133 ++--
 .../apache/pinot/server/api/BaseResourceTest.java  |  65 +-
 .../pinot/server/api/TablesResourceTest.java       |  25 +-
 pinot-spi/pom.xml                                  |  18 +-
 .../pinot/spi/annotations/ScalarFunction.java      |   5 +
 .../config/instance/InstanceDataManagerConfig.java |   8 +
 .../spi/config/table/DimensionTableConfig.java     |  11 +-
 .../apache/pinot/spi/config/table/FieldConfig.java |  29 +-
 .../pinot/spi/config/table/IndexingConfig.java     |  28 +
 .../org/apache/pinot/spi/config/table/Intern.java  |  79 ++
 .../pinot/spi/config/table/RoutingConfig.java      |  10 +-
 .../apache/pinot/spi/config/table/TableConfig.java |  11 +
 .../pinot/spi/config/table/UpsertConfig.java       |  27 +-
 .../pinot/spi/config/task/AdhocTaskConfig.java     |   2 +
 .../apache/pinot/spi/data/DateTimeFormatSpec.java  |   2 +-
 .../java/org/apache/pinot/spi/data/FieldSpec.java  |  52 +-
 .../java/org/apache/pinot/spi/data/Schema.java     |  67 +-
 .../spi/data/readers/RecordReaderFileConfig.java   |  51 +-
 .../pinot/spi/env/CommonsConfigurationUtils.java   | 184 +++--
 .../pinot/spi/env/ConfigFilePropertyReader.java    |   6 +-
 .../spi/env/ConfigFilePropertyReaderFactory.java   |   8 +-
 .../apache/pinot/spi/env/PinotConfiguration.java   |  30 +-
 .../PinotBrokerQueryEventListenerFactory.java      |  34 +
 .../org/apache/pinot/spi/plugin/PluginManager.java |   2 +-
 .../org/apache/pinot/spi/stream/StreamConfig.java  |   7 +-
 .../pinot/spi/stream/StreamMetadataProvider.java   |  11 +-
 .../pinot/spi/trace/DefaultRequestContext.java     |  11 +
 .../org/apache/pinot/spi/trace/RequestContext.java |   4 +
 .../org/apache/pinot/spi/utils/BooleanUtils.java   |  11 +
 .../apache/pinot/spi/utils/CommonConstants.java    |  20 +-
 .../org/apache/pinot/spi/utils/FALFInterner.java   | 148 ++++
 .../java/org/apache/pinot/spi/utils/JsonUtils.java |   2 +-
 .../java/org/apache/pinot/spi/utils/Pairs.java     |  23 +-
 .../utils/builder/ControllerRequestURLBuilder.java |   4 +
 .../pinot/spi/config/table/IndexingConfigTest.java |   7 +-
 .../pinot/spi/config/task/AdhocTaskConfigTest.java |  18 +
 .../spi/env/CommonsConfigurationUtilsTest.java     |  15 +-
 .../pinot/spi/env/PinotConfigurationTest.java      |  11 +-
 pinot-tools/pom.xml                                |   4 +-
 .../org/apache/pinot/tools/HybridQuickstart.java   |   3 +-
 .../apache/pinot/tools/PinotTableRebalancer.java   |   4 +-
 .../org/apache/pinot/tools/QuickStartBase.java     | 462 ++++++++++-
 .../java/org/apache/pinot/tools/Quickstart.java    | 205 +----
 .../org/apache/pinot/tools/RealtimeQuickStart.java |   5 +-
 .../java/org/apache/pinot/tools/SpeedTest.java     |  49 +-
 .../admin/command/AbstractBaseAdminCommand.java    |  13 +
 .../pinot/tools/admin/command/AddTableCommand.java |  26 +-
 .../admin/command/ChangeNumReplicasCommand.java    |   2 +-
 .../tools/admin/command/GenerateDataCommand.java   |  88 +-
 .../tools/admin/command/RebalanceTableCommand.java |  10 +-
 .../tools/filesystem/PinotFSBenchmarkDriver.java   |   9 +-
 .../converter/DictionaryToRawIndexConverter.java   |  13 +-
 .../tools/streams/AvroFileSourceGenerator.java     |   2 +-
 .../org/apache/pinot/tools/utils/JarUtils.java     |  47 +-
 .../fineFoodReviews_offline_table_config.json      |  25 +-
 .../fineFoodReviews_realtime_table_config.json     |  73 ++
 .../fineFoodReviews/fineFoodReviews_schema.json    |  48 ++
 .../fine_food_reviews_with_embeddings_1k.json.gz   | Bin 0 -> 8101775 bytes
 pom.xml                                            | 111 ++-
 709 files changed, 27584 insertions(+), 8841 deletions(-)
 create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/request/context/predicate/VectorSimilarityPredicate.java
 create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdMetadataInfo.java
 create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutor.java
 delete mode 100644 pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/CLPDecodeRewriter.java
 create mode 100644 pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/ClpRewriter.java
 create mode 100644 pinot-common/src/test/java/org/apache/pinot/common/utils/FALFInternerTest.java
 create mode 100644 pinot-common/src/test/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutorTest.java
 create mode 100644 pinot-common/src/test/java/org/apache/pinot/common/utils/TlsUtilsTest.java
 delete mode 100644 pinot-common/src/test/java/org/apache/pinot/sql/parsers/rewriter/CLPDecodeRewriterTest.java
 create mode 100644 pinot-common/src/test/java/org/apache/pinot/sql/parsers/rewriter/ClpRewriterTest.java
 create mode 100644 pinot-common/src/test/resources/tls/keystore-updated.p12
 create mode 100644 pinot-common/src/test/resources/tls/keystore.p12
 create mode 100644 pinot-common/src/test/resources/tls/truststore-updated.p12
 create mode 100644 pinot-common/src/test/resources/tls/truststore.p12
 create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/DataGenerationHelpers.java
 create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DateTimeGenerator.java
 rename pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/{generator => writer}/AvroWriter.java (59%)
 copy pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMetrics.java => pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/AvroWriterSpec.java (50%)
 create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java
 create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java
 copy pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionTimer.java => pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriterSpec.java (55%)
 copy pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionTimer.java => pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/JsonWriter.java (54%)
 copy pinot-common/src/main/java/org/apache/pinot/sql/FilterKind.java => pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/Writer.java (51%)
 copy pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlanFragmentMetadata.java => pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/WriterSpec.java (69%)
 create mode 100644 pinot-controller/src/main/resources/app/components/AsyncInstanceTable.tsx
 create mode 100644 pinot-controller/src/main/resources/app/components/AsyncPinotSchemas.tsx
 create mode 100644 pinot-controller/src/main/resources/app/components/AsyncPinotTables.tsx
 copy pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlanFragmentMetadata.java => pinot-controller/src/main/resources/app/components/Loading.tsx (79%)
 create mode 100644 pinot-controller/src/main/resources/app/components/NotFound.tsx
 create mode 100644 pinot-core/src/main/java/org/apache/pinot/core/operator/filter/VectorSimilarityFilterOperator.java
 create mode 100644 pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ClpEncodedVarsMatchTransformFunction.java
 create mode 100644 pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/TextMatchFilterOptimizer.java
 copy pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlanFragmentMetadata.java => pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveConstraintsWriter.java (70%)
 copy pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionTimer.java => pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java (52%)
 copy pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlanFragmentMetadata.java => pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/FileWriter.java (75%)
 rename pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/{CLPDecodeTransformFunctionTest.java => ClpTransformFunctionsTest.java} (69%)
 create mode 100644 pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountRawHLLStarTreeV2Test.java
 create mode 100644 pinot-core/src/test/java/org/apache/pinot/queries/JsonMalformedIndexTest.java
 delete mode 100644 pinot-core/src/test/resources/data/dimBaseballTeams.avro
 delete mode 100644 pinot-core/src/test/resources/data/dimBaseballTeams.avsc
 copy pinot-tools/src/main/resources/examples/batch/dimBaseballTeams/dimBaseballTeams_offline_table_config.json => pinot-core/src/test/resources/data/dimBaseballTeams_config.json (100%)
 create mode 100644 pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeConsumptionRateLimiterClusterIntegrationTest.java
 create mode 100644 pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionRealtimeIngestionTest.java
 delete mode 100644 pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java
 create mode 100644 pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
 create mode 100644 pinot-integration-tests/src/test/resources/gameScores_large_csv.tar.gz
 delete mode 100644 pinot-integration-tests/src/test/resources/upsert_compaction_test.tar.gz
 create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtils.java
 create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtilsTest.java
 create mode 100644 pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRelDistributionTraitRule.java
 rename pinot-query-planner/src/main/java/org/apache/pinot/query/planner/{ => explain}/PhysicalExplainPlanVisitor.java (73%)
 rename pinot-query-planner/src/main/java/org/apache/pinot/query/planner/{ => physical}/DispatchablePlanFragment.java (97%)
 rename pinot-query-planner/src/main/java/org/apache/pinot/query/planner/{ => physical}/DispatchableSubPlan.java (98%)
 create mode 100644 pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
 create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
 delete mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerConfig.java
 delete mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerParams.java
 create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitMVEntryDictForwardIndexWriter.java
 create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/vector/MutableVectorIndex.java
 create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SpecialValueTransformer.java
 create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/ColumnJsonParserException.java
 create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueEntryDictForwardIndexCreator.java
 create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/vector/HnswVectorIndexCreator.java
 create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/vector/XKnnFloatVectorField.java
 create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/vector/lucene95/HnswCodec.java
 create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/vector/lucene95/HnswVectorsFormat.java
 create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryInternerHolder.java
 create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/VectorIndexHandler.java
 create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVEntryDictForwardIndexReader.java
 create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/vector/HnswDocIdCollector.java
 copy pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/{text/LuceneTextIndexReader.java => vector/HnswVectorIndexReader.java} (66%)
 copy pinot-spi/src/main/java/org/apache/pinot/spi/env/ConfigFilePropertyReader.java => pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/vector/VectorIndexPlugin.java (67%)
 create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/vector/VectorIndexType.java
 create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java
 create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/VectorIndexUtils.java
 create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
 create mode 100644 pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulatorTest.java
 create mode 100644 pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedBitMVEntryDictForwardIndexTest.java
 create mode 100644 pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexTypeTest.java
 copy pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionTimer.java => pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexCompatibleTest.java (58%)
 create mode 100644 pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReaderTest.java
 create mode 100644 pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
 create mode 100644 pinot-segment-local/src/test/resources/data/lucene_80_index/Text.lucene.index/_0.cfe
 create mode 100644 pinot-segment-local/src/test/resources/data/lucene_80_index/Text.lucene.index/_0.cfs
 create mode 100644 pinot-segment-local/src/test/resources/data/lucene_80_index/Text.lucene.index/_0.si
 create mode 100644 pinot-segment-local/src/test/resources/data/lucene_80_index/Text.lucene.index/segments_1
 copy .trivyignore => pinot-segment-local/src/test/resources/data/lucene_80_index/Text.lucene.index/write.lock (100%)
 create mode 100644 pinot-segment-local/src/test/resources/data/test_vector_data.avro
 copy pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionTimer.java => pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/DictIdCompressionType.java (51%)
 create mode 100644 pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/VectorIndexConfig.java
 copy pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionTimer.java => pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/VectorIndexCreator.java (56%)
 rename pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlanFragmentMetadata.java => pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/VectorIndexReader.java (60%)
 create mode 100644 pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
 create mode 100644 pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/startree/AggregationFunctionColumnPairTest.java
 create mode 100644 pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2MetadataTest.java
 create mode 100644 pinot-spi/src/main/java/org/apache/pinot/spi/config/table/Intern.java
 create mode 100644 pinot-spi/src/main/java/org/apache/pinot/spi/utils/FALFInterner.java
 create mode 100644 pinot-tools/src/main/resources/examples/stream/fineFoodReviews/fineFoodReviews_realtime_table_config.json
 create mode 100644 pinot-tools/src/main/resources/examples/stream/fineFoodReviews/fineFoodReviews_schema.json
 create mode 100644 pinot-tools/src/main/resources/examples/stream/fineFoodReviews/rawdata/fine_food_reviews_with_embeddings_1k.json.gz


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


(pinot) 01/05: Enhance the minimizeDataMovement to keep the existing pool assignment

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch maintain-pool-selection-for-minimizeDataMovement
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 9550545be3ea35b2874d79a62d396806c0f0f23f
Author: jlli_LinkedIn <jl...@linkedin.com>
AuthorDate: Sun Nov 5 10:55:11 2023 -0800

    Enhance the minimizeDataMovement to keep the existing pool assignment
---
 .../assignment/InstanceAssignmentConfigUtils.java  |   2 +-
 .../common/utils/config/TableConfigSerDeTest.java  |   2 +-
 .../instance/FDAwareInstancePartitionSelector.java |   6 +-
 .../instance/InstanceAssignmentDriver.java         |  10 +-
 .../instance/InstancePartitionSelector.java        |   4 +-
 .../instance/InstancePartitionSelectorFactory.java |  18 +-
 .../InstanceReplicaGroupPartitionSelector.java     |  78 +++++++--
 .../instance/InstanceTagPoolSelector.java          |  63 ++++++-
 .../MirrorServerSetInstancePartitionSelector.java  |   4 +-
 ...anceAssignmentRestletResourceStatelessTest.java |   6 +-
 .../instance/InstanceAssignmentTest.java           | 193 ++++++++++++++++-----
 .../InstanceReplicaGroupPartitionSelectorTest.java |   2 +-
 .../TableRebalancerClusterStatelessTest.java       |   4 +-
 .../table/assignment/InstanceAssignmentConfig.java |  16 +-
 .../InstanceReplicaGroupPartitionConfig.java       |   2 +
 15 files changed, 311 insertions(+), 99 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
index ebf38d308f..13cc270954 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
@@ -122,7 +122,7 @@ public class InstanceAssignmentConfigUtils {
           replicaGroupStrategyConfig.getNumInstancesPerPartition(), 0, 0, minimizeDataMovement, null);
     }
 
-    return new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig);
+    return new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, minimizeDataMovement);
   }
 
   public static boolean isMirrorServerSetAssignment(TableConfig tableConfig,
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index ed9d605af0..74f857a102 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -212,7 +212,7 @@ public class TableConfigSerDeTest {
       InstanceAssignmentConfig instanceAssignmentConfig =
           new InstanceAssignmentConfig(new InstanceTagPoolConfig("tenant_OFFLINE", true, 3, null),
               new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2")),
-              new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false, null));
+              new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false, null), null, false);
       TableConfig tableConfig = tableConfigBuilder.setInstanceAssignmentConfigMap(
           Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), instanceAssignmentConfig)).build();
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
index 294971615a..de96d4da4d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
@@ -50,8 +50,8 @@ public class FDAwareInstancePartitionSelector extends InstancePartitionSelector
   private static final Logger LOGGER = LoggerFactory.getLogger(FDAwareInstancePartitionSelector.class);
 
   public FDAwareInstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
-      String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions) {
-    super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions);
+      String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
+    super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement);
   }
 
   /**
@@ -152,7 +152,7 @@ public class FDAwareInstancePartitionSelector extends InstancePartitionSelector
        * initialize the new replicaGroupBasedAssignmentState for assignment,
        * place existing instances in their corresponding positions
        */
-      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) {
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
         int numExistingReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
         int numExistingPartitions = _existingInstancePartitions.getNumPartitions();
         /*
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index 6d869b86c1..09866c1ed7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -64,8 +64,8 @@ public class InstanceAssignmentDriver {
   }
 
   public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType,
-      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, @Nullable
-      InstancePartitions preConfiguredInstancePartitions) {
+      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions,
+      @Nullable InstancePartitions preConfiguredInstancePartitions) {
     String tableNameWithType = _tableConfig.getTableName();
     InstanceAssignmentConfig assignmentConfig =
         InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType);
@@ -88,8 +88,10 @@ public class InstanceAssignmentDriver {
     String tableNameWithType = _tableConfig.getTableName();
     LOGGER.info("Starting {} instance assignment for table {}", instancePartitionsName, tableNameWithType);
 
+    boolean minimizeDataMovement = instanceAssignmentConfig.isMinimizeDataMovement();
     InstanceTagPoolSelector tagPoolSelector =
-        new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType);
+        new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType,
+            minimizeDataMovement, existingInstancePartitions);
     Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = tagPoolSelector.selectInstances(instanceConfigs);
 
     InstanceConstraintConfig constraintConfig = instanceAssignmentConfig.getConstraintConfig();
@@ -106,7 +108,7 @@ public class InstanceAssignmentDriver {
     InstancePartitionSelector instancePartitionSelector =
         InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(),
             instanceAssignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions,
-            preConfiguredInstancePartitions);
+            preConfiguredInstancePartitions, minimizeDataMovement);
     InstancePartitions instancePartitions = new InstancePartitions(instancePartitionsName);
     instancePartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions);
     return instancePartitions;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
index 396b869924..5f92db2426 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
@@ -29,12 +29,14 @@ abstract class InstancePartitionSelector {
   protected final InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig;
   protected final String _tableNameWithType;
   protected final InstancePartitions _existingInstancePartitions;
+  protected final boolean _minimizeDataMovement;
 
   public InstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
-      String tableNameWithType, InstancePartitions existingInstancePartitions) {
+      String tableNameWithType, InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
     _replicaGroupPartitionConfig = replicaGroupPartitionConfig;
     _tableNameWithType = tableNameWithType;
     _existingInstancePartitions = existingInstancePartitions;
+    _minimizeDataMovement = minimizeDataMovement;
   }
 
   /**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
index 256aa89b02..8a343b1598 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.controller.helix.core.assignment.instance;
 
 import java.util.Arrays;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
@@ -31,25 +32,18 @@ public class InstancePartitionSelectorFactory {
 
   public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
       InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType,
-      InstancePartitions existingInstancePartitions) {
-    return getInstance(partitionSelector, instanceReplicaGroupPartitionConfig, tableNameWithType,
-        existingInstancePartitions, null);
-  }
-
-  public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
-      InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType,
-      InstancePartitions existingInstancePartitions, InstancePartitions preConfiguredInstancePartitions
-  ) {
+      InstancePartitions existingInstancePartitions, @Nullable InstancePartitions preConfiguredInstancePartitions,
+      boolean minimizeDataMovement) {
     switch (partitionSelector) {
       case FD_AWARE_INSTANCE_PARTITION_SELECTOR:
         return new FDAwareInstancePartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType,
-            existingInstancePartitions);
+            existingInstancePartitions, minimizeDataMovement);
       case INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR:
         return new InstanceReplicaGroupPartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType,
-            existingInstancePartitions);
+            existingInstancePartitions, minimizeDataMovement);
       case MIRROR_SERVER_SET_PARTITION_SELECTOR:
         return new MirrorServerSetInstancePartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType,
-            existingInstancePartitions, preConfiguredInstancePartitions);
+            existingInstancePartitions, preConfiguredInstancePartitions, minimizeDataMovement);
       default:
         throw new IllegalStateException("Unexpected PartitionSelector: " + partitionSelector + ", should be from"
             + Arrays.toString(InstanceAssignmentConfig.PartitionSelector.values()));
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
index de1e681d17..79e95db7a6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
@@ -22,18 +22,21 @@ import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.TreeMap;
 import javax.annotation.Nullable;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.apache.pinot.spi.utils.Pairs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,8 +49,8 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
   private static final Logger LOGGER = LoggerFactory.getLogger(InstanceReplicaGroupPartitionSelector.class);
 
   public InstanceReplicaGroupPartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
-      String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions) {
-    super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions);
+      String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
+    super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement);
   }
 
   /**
@@ -73,16 +76,65 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
       Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
       Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
       Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
-      for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-        // Pick one pool for each replica-group based on the table name hash
-        int pool = pools.get((tableNameHash + replicaId) % numPools);
-        poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
-        replicaGroupIdToPoolMap.put(replicaId, pool);
+      Map<String, Integer> instanceToPoolMap = new HashMap<>();
+      for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<InstanceConfig> instanceConfigsInPool = entry.getValue();
+        Set<String> candidateInstances = poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+        for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+          String instanceName = instanceConfig.getInstanceName();
+          candidateInstances.add(instanceName);
+          instanceToPoolMap.put(instanceName, pool);
+        }
+      }
 
-        Set<String> candidateInstances =
-            poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
-        List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool);
-        instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName()));
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
+        // Keep the same pool for the replica group if it's already been used for the table.
+        int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+        int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);
+        for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) {
+          boolean foundExistingReplicaGroup = false;
+          for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingReplicaGroup; partitionId++) {
+            List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            for (String existingInstance : existingInstances) {
+              Integer existingPool = instanceToPoolMap.get(existingInstance);
+              if (existingPool != null & pools.contains(existingPool)) {
+                poolToReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new ArrayList<>()).add(replicaGroupId);
+                replicaGroupIdToPoolMap.put(replicaGroupId, existingPool);
+                foundExistingReplicaGroup = true;
+                break;
+              }
+            }
+          }
+        }
+        // Use a min heap to track the least frequently picked pool among all the pools
+        PriorityQueue<Pairs.IntPair> minHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator());
+        for (int pool : pools) {
+          int numExistingReplicaGroups =
+              poolToReplicaGroupIdsMap.get(pool) != null ? poolToReplicaGroupIdsMap.get(pool).size() : 0;
+          minHeap.add(new Pairs.IntPair(numExistingReplicaGroups, pool));
+        }
+        for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
+          if (replicaGroupIdToPoolMap.containsKey(replicaId)) {
+            continue;
+          }
+          // Increment the frequency for a given pool and put it back to the min heap to rotate the pool selection.
+          Pairs.IntPair pair = minHeap.remove();
+          int pool = pair.getRight();
+          pair.setLeft(pair.getLeft() + 1);
+          minHeap.add(pair);
+          poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
+          replicaGroupIdToPoolMap.put(replicaId, pool);
+        }
+      } else {
+        // Current default way to assign pool to replica groups.
+        for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
+          // Pick one pool for each replica-group based on the table name hash
+          int pool = pools.get((tableNameHash + replicaId) % numPools);
+          poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
+          replicaGroupIdToPoolMap.put(replicaId, pool);
+        }
       }
       LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap,
           _tableNameWithType);
@@ -132,7 +184,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
       LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}",
           numPartitions, numInstancesPerPartition, _tableNameWithType);
 
-      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) {
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
         // Minimize data movement.
         int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
         int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
@@ -257,7 +309,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
       }
 
       List<String> instancesToSelect;
-      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) {
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
         // Minimize data movement.
         List<String> existingInstances = _existingInstancePartitions.getInstances(0, 0);
         LinkedHashSet<String> candidateInstances = new LinkedHashSet<>();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
index 5aefd1ad69..755e7aa713 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
@@ -21,11 +21,15 @@ package org.apache.pinot.controller.helix.core.assignment.instance;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import javax.annotation.Nullable;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.utils.config.InstanceUtils;
 import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
 import org.slf4j.Logger;
@@ -41,9 +45,17 @@ public class InstanceTagPoolSelector {
   private final InstanceTagPoolConfig _tagPoolConfig;
   private final String _tableNameWithType;
 
-  public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType) {
+  private final boolean _minimizeDataMovement;
+
+  private final InstancePartitions _existingInstancePartitions;
+
+  public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType,
+      boolean minimizeDataMovement,
+      @Nullable InstancePartitions existingInstancePartitions) {
     _tagPoolConfig = tagPoolConfig;
     _tableNameWithType = tableNameWithType;
+    _minimizeDataMovement = minimizeDataMovement;
+    _existingInstancePartitions = existingInstancePartitions;
   }
 
   /**
@@ -70,12 +82,14 @@ public class InstanceTagPoolSelector {
     if (_tagPoolConfig.isPoolBased()) {
       // Pool based selection
 
+      Map<String, Integer> instanceToPoolMap = new HashMap<>();
       // Extract the pool information from the instance configs
       for (InstanceConfig instanceConfig : candidateInstanceConfigs) {
         Map<String, String> poolMap = instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY);
         if (poolMap != null && poolMap.containsKey(tag)) {
           int pool = Integer.parseInt(poolMap.get(tag));
           poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(instanceConfig);
+          instanceToPoolMap.put(instanceConfig.getInstanceName(), pool);
         }
       }
       Preconditions.checkState(!poolToInstanceConfigsMap.isEmpty(),
@@ -96,9 +110,8 @@ public class InstanceTagPoolSelector {
         int numPools = poolToInstanceConfigsMap.size();
         int numPoolsToSelect = _tagPoolConfig.getNumPools();
         if (numPoolsToSelect > 0) {
-          Preconditions
-              .checkState(numPoolsToSelect <= numPools, "Not enough instance pools (%s in the cluster, asked for %s)",
-                  numPools, numPoolsToSelect);
+          Preconditions.checkState(numPoolsToSelect <= numPools,
+              "Not enough instance pools (%s in the cluster, asked for %s)", numPools, numPoolsToSelect);
         } else {
           numPoolsToSelect = numPools;
         }
@@ -109,11 +122,45 @@ public class InstanceTagPoolSelector {
           return poolToInstanceConfigsMap;
         }
 
-        // Select pools based on the table name hash to evenly distribute the tables
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        List<Integer> poolsInCluster = new ArrayList<>(pools);
-        for (int i = 0; i < numPoolsToSelect; i++) {
-          poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+        if (_minimizeDataMovement && _existingInstancePartitions != null) {
+          Set<Integer> existingPools = new HashSet<>(numPoolsToSelect);
+          // Keep the same pool if it's already been used for the table.
+          int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+          int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+          for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
+            boolean foundExistingPoolForReplicaGroup = false;
+            for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingPoolForReplicaGroup;
+                partitionId++) {
+              List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+              for (String existingInstance : existingInstances) {
+                Integer existingPool = instanceToPoolMap.get(existingInstance);
+                if (existingPool != null & pools.contains(existingPool)) {
+                  poolsToSelect.add(existingPool);
+                  existingPools.add(existingPool);
+                  foundExistingPoolForReplicaGroup = true;
+                  break;
+                }
+              }
+            }
+          }
+          LOGGER.info("Keep the same pool: {} for table: {}", existingPools, _tableNameWithType);
+          // Pick a pool from remainingPools that isn't used before.
+          List<Integer> remainingPools = new ArrayList<>(pools);
+          remainingPools.retainAll(existingPools);
+          // Skip selecting the existing pool.
+          for (int i = 0; i < numPoolsToSelect; i++) {
+            if (existingPools.contains(i)) {
+              continue;
+            }
+            poolsToSelect.add(remainingPools.remove(i % remainingPools.size()));
+          }
+        } else {
+          // Select pools based on the table name hash to evenly distribute the tables
+          List<Integer> poolsInCluster = new ArrayList<>(pools);
+          for (int i = 0; i < numPoolsToSelect; i++) {
+            poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+          }
         }
       }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java
index 6b4086615a..f273866eeb 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java
@@ -76,8 +76,8 @@ public class MirrorServerSetInstancePartitionSelector extends InstancePartitionS
 
   public MirrorServerSetInstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
       String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions,
-      InstancePartitions preConfiguredInstancePartitions) {
-    super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions);
+      InstancePartitions preConfiguredInstancePartitions, boolean minimizeDataMovement) {
+    super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement);
     _preConfiguredInstancePartitions = preConfiguredInstancePartitions;
     _numTargetInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
     _numTargetReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
index dedc79384e..9feb8844c8 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
@@ -118,7 +118,7 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
     // Add OFFLINE instance assignment config to the offline table config
     InstanceAssignmentConfig offlineInstanceAssignmentConfig = new InstanceAssignmentConfig(
         new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), false, 0, null), null,
-        new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null));
+        new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null), null, false);
     offlineTableConfig.setInstanceAssignmentConfigMap(
         Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), offlineInstanceAssignmentConfig));
     _helixResourceManager.setExistingTableConfig(offlineTableConfig);
@@ -136,7 +136,7 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
     // Add CONSUMING instance assignment config to the real-time table config
     InstanceAssignmentConfig consumingInstanceAssignmentConfig = new InstanceAssignmentConfig(
         new InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(SERVER_TENANT_NAME), false, 0, null), null,
-        new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null));
+        new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null), null, false);
     realtimeTableConfig.setInstanceAssignmentConfigMap(
         Collections.singletonMap(InstancePartitionsType.CONSUMING.toString(), consumingInstanceAssignmentConfig));
     _helixResourceManager.setExistingTableConfig(realtimeTableConfig);
@@ -164,7 +164,7 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
             null)));
     InstanceAssignmentConfig tierInstanceAssignmentConfig = new InstanceAssignmentConfig(
         new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), false, 0, null), null,
-        new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null));
+        new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null), null, false);
     Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = new HashMap<>();
     instanceAssignmentConfigMap.put(InstancePartitionsType.OFFLINE.toString(), offlineInstanceAssignmentConfig);
     instanceAssignmentConfigMap.put(TIER_NAME, tierInstanceAssignmentConfig);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index b25a529e10..a6220c00a2 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -374,7 +374,7 @@ public class InstanceAssignmentTest {
       TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
           .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
               new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                  InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                  InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
           .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured"))
           .build();
       InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig);
@@ -480,7 +480,7 @@ public class InstanceAssignmentTest {
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
         .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
     InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig);
     InstancePartitions preConfigured = new InstancePartitions("preConfigured");
@@ -561,7 +561,7 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
         .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
     preConfigured = new InstancePartitions("preConfigured");
@@ -664,7 +664,7 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
         .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
     preConfigured = new InstancePartitions("preConfigured");
@@ -756,7 +756,7 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
         .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
     preConfigured = new InstancePartitions("preConfigured");
@@ -851,7 +851,7 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
         .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
     preConfigured = new InstancePartitions("preConfigured");
@@ -956,7 +956,7 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
         .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
     preConfigured = new InstancePartitions("preConfigured");
@@ -1063,7 +1063,7 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
         .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
     preConfigured = new InstancePartitions("preConfigured");
@@ -1156,7 +1156,7 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
         .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -1230,7 +1230,7 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
         .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -1311,7 +1311,7 @@ public class InstanceAssignmentTest {
         new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false, null);
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-            new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))).build();
+            new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, false))).build();
     InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig);
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
@@ -1364,7 +1364,7 @@ public class InstanceAssignmentTest {
     // Select all 3 pools in pool selection
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, false)));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
     // All instances in pool 2 should be assigned to replica-group 0, and all instances in pool 0 should be assigned to
@@ -1386,7 +1386,7 @@ public class InstanceAssignmentTest {
     // Select pool 0 and 1 in pool selection
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 1));
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, false)));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
     // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
@@ -1408,7 +1408,7 @@ public class InstanceAssignmentTest {
     numReplicaGroups = numPools;
     replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, false)));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
     // [pool0, pool1]
@@ -1438,7 +1438,7 @@ public class InstanceAssignmentTest {
     replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null);
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true)));
     // Reset the instance configs to have only two pools.
     instanceConfigs.clear();
     numInstances = 10;
@@ -1487,7 +1487,7 @@ public class InstanceAssignmentTest {
     // Select pool 0 and 1 in pool selection
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 1));
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true)));
 
     // Get the latest existingInstancePartitions from last computation.
     existingInstancePartitions = instancePartitions;
@@ -1514,7 +1514,7 @@ public class InstanceAssignmentTest {
     numReplicaGroups = 3;
     replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true)));
 
     // Get the latest existingInstancePartitions from last computation.
     existingInstancePartitions = instancePartitions;
@@ -1593,7 +1593,7 @@ public class InstanceAssignmentTest {
     numReplicaGroups = 2;
     replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true)));
 
     // Get the latest existingInstancePartitions from last computation.
     existingInstancePartitions = instancePartitions;
@@ -1693,6 +1693,109 @@ public class InstanceAssignmentTest {
     assertEquals(instancePartitions.getInstances(0, 1),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11,
             SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+
+    // The below is the test suite for testing out minimizeDataMovement with pool configs
+    // Add the third pool with same number of instances but keep number of pools the same (i.e. 2)
+    numPools = 3;
+    numInstances = numPools * numInstancesPerPool;
+    for (int i = numInstances + 4; i < numInstances + 9; i++) {
+      InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      int pool = numPools - 1;
+      instanceConfig.getRecord()
+          .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2, but since minimizeDataMovement is enabled,
+    // same pools would be re-used.
+    // [pool0, pool1]
+    //  r0     r1
+    // Thus, the instance partition assignment remains the same as the previous one.
+    //     pool 0: [ i12, i4,  i0,  i1, i10 ]
+    //     pool 1: [  i7, i9, i11, i13,  i6 ]
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0,
+            SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11,
+            SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+
+    // Set tag pool config to 3.
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true)));
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Putting the existingPoolToInstancesMap shouldn't change the instance assignment,
+    // as there are only 2 replica groups needed.
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
+    // But since Pool 0 and Pool 1 is already being used for the table, the numReplica remains at 2,
+    // so the 3rd pool (Pool 2) won't be picked up.
+    // Thus, the instance partition assignment remains the same as the existing one.
+    // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
+    // replica-group 1
+    // Now in poolToInstancesMap:
+    //     pool 0: [ i12, i4,  i0,  i1, i10 ]
+    //     pool 1: [  i7, i9, i11, i13,  i6 ]
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0,
+            SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11,
+            SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+
+    // Set replica group from 2 to 3
+    numReplicaGroups = 3;
+    replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null);
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true)));
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Now that 1 more replica group is needed, Pool 2 will be chosen for the 3rd replica group
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
+    // [pool0, pool1, pool2]
+    //  r0     r1     r2
+    // Each replica-group should have 2 instances assigned
+    // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
+    // Latest instances from ZK:
+    //   pool 0: [ i3, i4, i0, i1, i2 ]
+    //   pool 1: [ i8, i9, i5, i6, i7 ]
+    //   pool 2: [ i22,i23,i19,i20,i21]
+    // Thus, the new assignment will become:
+    //   pool 0: [ i12, i4,  i0,  i1, i10 ]
+    //   pool 1: [  i7, i9, i11, i13,  i6 ]
+    //   pool 2: [ i22, i23, i19, i20,i21 ]
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0,
+            SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11,
+            SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 22, SERVER_INSTANCE_ID_PREFIX + 23, SERVER_INSTANCE_ID_PREFIX + 19,
+            SERVER_INSTANCE_ID_PREFIX + 20, SERVER_INSTANCE_ID_PREFIX + 21));
   }
 
   @Test
@@ -1720,7 +1823,7 @@ public class InstanceAssignmentTest {
     InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
 
     // No instance with correct tag
     try {
@@ -1750,7 +1853,7 @@ public class InstanceAssignmentTest {
     // Enable pool
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
 
     // No instance has correct pool configured
     try {
@@ -1784,7 +1887,7 @@ public class InstanceAssignmentTest {
 
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 3, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
 
     // Ask for too many pools
     try {
@@ -1796,7 +1899,7 @@ public class InstanceAssignmentTest {
 
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 2));
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
 
     // Ask for pool that does not exist
     try {
@@ -1810,7 +1913,7 @@ public class InstanceAssignmentTest {
     replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(false, 6, 0, 0, 0, 0, false, null
     );
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
 
     // Ask for too many instances
     try {
@@ -1824,7 +1927,7 @@ public class InstanceAssignmentTest {
     replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 0, 0, 0, 0, false, null
     );
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
 
     // Number of replica-groups must be positive
     try {
@@ -1836,7 +1939,7 @@ public class InstanceAssignmentTest {
 
     replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 11, 0, 0, 0, false, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
 
     // Ask for too many replica-groups
     try {
@@ -1849,7 +1952,7 @@ public class InstanceAssignmentTest {
 
     replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 3, 0, 0, false, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
 
     // Ask for too many instances
     try {
@@ -1861,7 +1964,7 @@ public class InstanceAssignmentTest {
 
     replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 3, false, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
 
     // Ask for too many instances per partition
     try {
@@ -1874,7 +1977,7 @@ public class InstanceAssignmentTest {
 
     replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 0, false, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
     // pool0: [i3, i4, i0, i1, i2]
@@ -1914,7 +2017,8 @@ public class InstanceAssignmentTest {
     try {
       tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
           .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-              new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, "ILLEGAL_SELECTOR"))).build();
+              new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, "ILLEGAL_SELECTOR", false)))
+          .build();
     } catch (IllegalArgumentException e) {
       assertEquals(e.getMessage(),
           "No enum constant org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig.PartitionSelector"
@@ -1943,7 +2047,8 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
         Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
+                InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
+        .build();
     driver = new InstanceAssignmentDriver(tableConfig);
     try {
       instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
@@ -1976,7 +2081,8 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
         Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
+                InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
+        .build();
     driver = new InstanceAssignmentDriver(tableConfig);
     try {
       instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
@@ -2017,7 +2123,8 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
         Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
+                InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
+        .build();
     driver = new InstanceAssignmentDriver(tableConfig);
     try {
       instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
@@ -2055,7 +2162,8 @@ public class InstanceAssignmentTest {
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
+                InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
+        .build();
     InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig);
 
     InstancePartitions instancePartitions =
@@ -2127,7 +2235,8 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
         Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
+                InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true)))
+        .build();
     driver = new InstanceAssignmentDriver(tableConfig);
     // existingInstancePartitions = instancePartitions
     instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, instancePartitions);
@@ -2208,7 +2317,7 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
             Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
         .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(partitionColumnName, numInstancesPerReplicaGroup))
         .setSegmentPartitionConfig(segmentPartitionConfig).build();
     driver = new InstanceAssignmentDriver(tableConfig);
@@ -2282,7 +2391,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
             .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
-                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -2338,7 +2447,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
             .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
-                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -2405,7 +2514,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
             .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
-                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -2471,7 +2580,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
             .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
-                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -2542,7 +2651,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
             .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
-                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -2593,7 +2702,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
             .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
-                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -2657,7 +2766,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
             .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
-                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
index 889206437f..2fdef27796 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
@@ -64,7 +64,7 @@ public class InstanceReplicaGroupPartitionSelectorTest {
         new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null);
 
     InstanceReplicaGroupPartitionSelector selector =
-        new InstanceReplicaGroupPartitionSelector(config, "tableNameBlah", existing);
+        new InstanceReplicaGroupPartitionSelector(config, "tableNameBlah", existing, true);
 
     String[] serverNames = {"rg0-0", "rg0-1", "rg1-0", "rg1-1"};
     String[] poolNumbers = {"0", "0", "1", "1"};
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 5d679c0380..1df7109ef2 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -195,7 +195,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest {
     InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
     _helixResourceManager.updateTableConfig(tableConfig);
 
     // No need to reassign instances because instances should be automatically assigned when updating the table config
@@ -481,7 +481,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest {
     InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(TIER_A_NAME,
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
     _helixResourceManager.updateTableConfig(tableConfig);
 
     rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
index 391ba4812d..ad4b22ecaf 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
@@ -41,13 +41,17 @@ public class InstanceAssignmentConfig extends BaseJsonConfig {
       "Configuration for the instance replica-group and partition of the instance assignment (mandatory)")
   private final InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig;
 
+  @JsonPropertyDescription("Configuration to minimize data movement for pool and instance assignment")
+  private final boolean _minimizeDataMovement;
+
   @JsonCreator
   public InstanceAssignmentConfig(
       @JsonProperty(value = "tagPoolConfig", required = true) InstanceTagPoolConfig tagPoolConfig,
       @JsonProperty("constraintConfig") @Nullable InstanceConstraintConfig constraintConfig,
       @JsonProperty(value = "replicaGroupPartitionConfig", required = true)
           InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
-      @JsonProperty("partitionSelector") @Nullable String partitionSelector) {
+      @JsonProperty("partitionSelector") @Nullable String partitionSelector,
+      @JsonProperty("minimizeDataMovement") boolean minimizeDataMovement) {
     Preconditions.checkArgument(tagPoolConfig != null, "'tagPoolConfig' must be configured");
     Preconditions
         .checkArgument(replicaGroupPartitionConfig != null, "'replicaGroupPartitionConfig' must be configured");
@@ -57,11 +61,7 @@ public class InstanceAssignmentConfig extends BaseJsonConfig {
     _partitionSelector =
         partitionSelector == null ? PartitionSelector.INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR
             : PartitionSelector.valueOf(partitionSelector);
-  }
-
-  public InstanceAssignmentConfig(InstanceTagPoolConfig tagPoolConfig, InstanceConstraintConfig constraintConfig,
-      InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig) {
-    this(tagPoolConfig, constraintConfig, replicaGroupPartitionConfig, null);
+    _minimizeDataMovement = minimizeDataMovement;
   }
 
   public PartitionSelector getPartitionSelector() {
@@ -81,6 +81,10 @@ public class InstanceAssignmentConfig extends BaseJsonConfig {
     return _replicaGroupPartitionConfig;
   }
 
+  public boolean isMinimizeDataMovement() {
+    return _minimizeDataMovement;
+  }
+
   public enum PartitionSelector {
     FD_AWARE_INSTANCE_PARTITION_SELECTOR, INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR,
     MIRROR_SERVER_SET_PARTITION_SELECTOR
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java
index adc72e8f1c..1bc40cba21 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java
@@ -56,6 +56,8 @@ public class InstanceReplicaGroupPartitionConfig extends BaseJsonConfig {
       "Name of the column used for partition, if not provided table level replica group will be used")
   private final String _partitionColumn;
 
+  // TODO: remove this config in the next official release
+  @Deprecated
   private final boolean _minimizeDataMovement;
 
   @JsonCreator


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


(pinot) 03/05: Address PR comments

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch maintain-pool-selection-for-minimizeDataMovement
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 2a934ef00dcccb74a60312309300fcdbd013dd92
Author: jlli_LinkedIn <jl...@linkedin.com>
AuthorDate: Thu Nov 16 21:30:54 2023 -0800

    Address PR comments
---
 .../instance/InstanceTagPoolSelector.java          | 24 ++++++++++------------
 1 file changed, 11 insertions(+), 13 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
index 755e7aa713..2062a75209 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
@@ -22,11 +22,11 @@ import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstancePartitions;
@@ -50,8 +50,7 @@ public class InstanceTagPoolSelector {
   private final InstancePartitions _existingInstancePartitions;
 
   public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType,
-      boolean minimizeDataMovement,
-      @Nullable InstancePartitions existingInstancePartitions) {
+      boolean minimizeDataMovement, @Nullable InstancePartitions existingInstancePartitions) {
     _tagPoolConfig = tagPoolConfig;
     _tableNameWithType = tableNameWithType;
     _minimizeDataMovement = minimizeDataMovement;
@@ -124,7 +123,7 @@ public class InstanceTagPoolSelector {
 
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
         if (_minimizeDataMovement && _existingInstancePartitions != null) {
-          Set<Integer> existingPools = new HashSet<>(numPoolsToSelect);
+          Set<Integer> existingPools = new TreeSet<>();
           // Keep the same pool if it's already been used for the table.
           int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
           int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
@@ -135,9 +134,10 @@ public class InstanceTagPoolSelector {
               List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
               for (String existingInstance : existingInstances) {
                 Integer existingPool = instanceToPoolMap.get(existingInstance);
-                if (existingPool != null & pools.contains(existingPool)) {
-                  poolsToSelect.add(existingPool);
-                  existingPools.add(existingPool);
+                if (existingPool != null) {
+                  if (existingPools.add(existingPool)) {
+                    poolsToSelect.add(existingPool);
+                  }
                   foundExistingPoolForReplicaGroup = true;
                   break;
                 }
@@ -147,12 +147,10 @@ public class InstanceTagPoolSelector {
           LOGGER.info("Keep the same pool: {} for table: {}", existingPools, _tableNameWithType);
           // Pick a pool from remainingPools that isn't used before.
           List<Integer> remainingPools = new ArrayList<>(pools);
-          remainingPools.retainAll(existingPools);
-          // Skip selecting the existing pool.
-          for (int i = 0; i < numPoolsToSelect; i++) {
-            if (existingPools.contains(i)) {
-              continue;
-            }
+          remainingPools.removeAll(existingPools);
+          // Select from the remaining pools.
+          int remainingNumPoolsToSelect = numPoolsToSelect - poolsToSelect.size();
+          for (int i = 0; i < remainingNumPoolsToSelect; i++) {
             poolsToSelect.add(remainingPools.remove(i % remainingPools.size()));
           }
         } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


(pinot) 05/05: Enhance algorithm

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch maintain-pool-selection-for-minimizeDataMovement
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit a286bd868f434c72e32351890566d684ec14a378
Author: Xiaotian (Jackie) Jiang <ja...@gmail.com>
AuthorDate: Wed Jan 31 23:15:41 2024 -0800

    Enhance algorithm
---
 .../instance/FDAwareInstancePartitionSelector.java |   7 +-
 .../instance/InstancePartitionSelector.java        |  10 +-
 .../InstanceReplicaGroupPartitionSelector.java     | 768 +++++++++++----------
 .../instance/InstanceTagPoolSelector.java          |  56 +-
 .../instance/InstanceAssignmentTest.java           |  43 +-
 .../InstanceReplicaGroupPartitionSelectorTest.java | 141 ++--
 .../java/org/apache/pinot/spi/utils/Pairs.java     |  23 +-
 7 files changed, 562 insertions(+), 486 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
index de96d4da4d..89d64272e3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
@@ -109,10 +109,7 @@ public class FDAwareInstancePartitionSelector extends InstancePartitionSelector
     return new ImmutablePair<>(numReplicaGroups, numInstancesPerReplicaGroup);
   }
 
-  /**
-   * Selects instances based on the replica-group/partition config, and stores the result into the given instance
-   * partitions.
-   */
+  @Override
   public void selectInstances(Map<Integer, List<InstanceConfig>> faultDomainToInstanceConfigsMap,
       InstancePartitions instancePartitions) {
 
@@ -152,7 +149,7 @@ public class FDAwareInstancePartitionSelector extends InstancePartitionSelector
        * initialize the new replicaGroupBasedAssignmentState for assignment,
        * place existing instances in their corresponding positions
        */
-      if (_minimizeDataMovement && _existingInstancePartitions != null) {
+      if (_minimizeDataMovement) {
         int numExistingReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
         int numExistingPartitions = _existingInstancePartitions.getNumPartitions();
         /*
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
index 335070b003..b80ad8bba9 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.helix.core.assignment.instance;
 
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
@@ -32,13 +33,14 @@ abstract class InstancePartitionSelector {
   protected final boolean _minimizeDataMovement;
 
   public InstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
-      String tableNameWithType, InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
+      String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
     _replicaGroupPartitionConfig = replicaGroupPartitionConfig;
     _tableNameWithType = tableNameWithType;
     _existingInstancePartitions = existingInstancePartitions;
-    // For backward compatibility, enable minimize data movement when it is enabled in top level or instance
-    // partition selector level.
-    _minimizeDataMovement = minimizeDataMovement || replicaGroupPartitionConfig.isMinimizeDataMovement();
+    // For backward compatibility, enable minimize data movement when it is enabled in top level or instance partition
+    // selector level
+    _minimizeDataMovement = (minimizeDataMovement || replicaGroupPartitionConfig.isMinimizeDataMovement())
+        && existingInstancePartitions != null;
   }
 
   /**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
index 505006f1d3..8da6dbe2f6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
@@ -19,24 +19,22 @@
 package org.apache.pinot.controller.helix.core.assignment.instance;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.TreeMap;
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
-import org.apache.pinot.spi.utils.Pairs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,10 +51,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
     super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement);
   }
 
-  /**
-   * Selects instances based on the replica-group/partition config, and stores the result into the given instance
-   * partitions.
-   */
+  @Override
   public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap,
       InstancePartitions instancePartitions) {
     int numPools = poolToInstanceConfigsMap.size();
@@ -65,393 +60,448 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
     int tableNameHash = Math.abs(_tableNameWithType.hashCode());
     List<Integer> pools = new ArrayList<>(poolToInstanceConfigsMap.keySet());
     pools.sort(null);
-    LOGGER.info("Starting instance replica-group/partition selection for table: {} with hash: {} from pools: {}",
-        _tableNameWithType, tableNameHash, pools);
+    LOGGER.info("Starting instance replica-group/partition selection for table: {} with hash: {} from pools: {}, "
+        + "minimize data movement: {}", _tableNameWithType, tableNameHash, pools, _minimizeDataMovement);
 
     if (_replicaGroupPartitionConfig.isReplicaGroupBased()) {
-      // Replica-group based selection
-
-      int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups();
-      Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive");
-      Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
-      Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>();
-      Map<Integer, Set<Integer>> existingPoolToExistingReplicaGroupIdsMap = new TreeMap<>();
-      Map<Integer, Set<String>> existingReplicaGroupIdToExistingInstancesMap = new TreeMap<>();
-      Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
-      Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
-      Map<String, Integer> instanceToPoolMap = new HashMap<>();
-      for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
-        Integer pool = entry.getKey();
-        List<InstanceConfig> instanceConfigsInPool = entry.getValue();
-        Set<String> candidateInstances = poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
-        for (InstanceConfig instanceConfig : instanceConfigsInPool) {
-          String instanceName = instanceConfig.getInstanceName();
-          candidateInstances.add(instanceName);
-          instanceToPoolMap.put(instanceName, pool);
-        }
+      if (_minimizeDataMovement) {
+        replicaGroupBasedMinimumMovement(poolToInstanceConfigsMap, instancePartitions, pools, tableNameHash);
+      } else {
+        replicaGroupBasedSimple(poolToInstanceConfigsMap, instancePartitions, pools, tableNameHash);
       }
+    } else {
+      nonReplicaGroupBased(poolToInstanceConfigsMap, instancePartitions, pools, tableNameHash);
+    }
+  }
 
-      if (_minimizeDataMovement && _existingInstancePartitions != null) {
-        // Collect the stats between the existing pools, existing replica groups, and existing instances.
-        int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
-        int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
-        for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
-          for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
-            List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
-            for (String existingInstance : existingInstances) {
-              Integer existingPool = instanceToPoolMap.get(existingInstance);
-              if (existingPool != null) {
-                existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new HashSet<>())
-                    .add(existingInstance);
-                existingPoolToExistingReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new HashSet<>())
-                    .add(replicaGroupId);
-                existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>())
-                    .add(existingInstance);
-              }
-            }
-          }
-        }
-
-        // Use a max heap to track the number of servers used for the given pools,
-        // so that pool with max number of existing instances will be considered first.
-        PriorityQueue<Pairs.IntPair> maxHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator(false));
-        for (int pool : pools) {
-          maxHeap.add(
-              new Pairs.IntPair(existingPoolsToExistingInstancesMap.computeIfAbsent(pool, k -> new HashSet<>()).size(),
-                  pool));
-        }
+  private void nonReplicaGroupBased(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap,
+      InstancePartitions instancePartitions, List<Integer> pools, int tableNameHash) {
+    // Pick one pool based on the table name hash
+    int pool = pools.get(Math.abs(tableNameHash % pools.size()));
+    LOGGER.info("Selecting pool: {} for table: {}", pool, _tableNameWithType);
+    List<InstanceConfig> instanceConfigs = poolToInstanceConfigsMap.get(pool);
+    int numInstances = instanceConfigs.size();
 
-        // Get the maximum number of replica groups per pool.
-        int maxNumberOfReplicaGroupPerPool = numReplicaGroups / pools.size();
-        // Given a pool number, assign replica group which has the max number of existing instances.
-        // Repeat this process until the max number of replica groups per pool is reached.
-        while (!maxHeap.isEmpty()) {
-          Pairs.IntPair pair = maxHeap.remove();
-          int poolNumber = pair.getRight();
-          for (int i = 0; i < maxNumberOfReplicaGroupPerPool; i++) {
-            Set<Integer> existingReplicaGroups = existingPoolToExistingReplicaGroupIdsMap.get(poolNumber);
-            if (existingReplicaGroups == null || existingReplicaGroups.isEmpty()) {
-              continue;
-            }
-            int targetReplicaGroupId = -1;
-            int maxNumInstances = 0;
-            for (int existingReplicaGroupId : existingReplicaGroups) {
-              int numExistingInstances =
-                  existingReplicaGroupIdToExistingInstancesMap.getOrDefault(existingReplicaGroupId, new HashSet<>())
-                      .size();
-              if (numExistingInstances > maxNumInstances) {
-                maxNumInstances = numExistingInstances;
-                targetReplicaGroupId = existingReplicaGroupId;
-              }
-            }
-            // If target existing replica group cannot be found, it means it should be chosen from a new replica group.
-            if (targetReplicaGroupId > -1) {
-              poolToReplicaGroupIdsMap.computeIfAbsent(poolNumber, k -> new ArrayList<>()).add(targetReplicaGroupId);
-              replicaGroupIdToPoolMap.put(targetReplicaGroupId, poolNumber);
-              // Clear the stats so that the same replica group won't be picked up again in later iteration.
-              existingReplicaGroupIdToExistingInstancesMap.get(targetReplicaGroupId).clear();
-            }
-          }
-        }
+    // Assign all instances if not configured
+    int numInstancesToSelect = _replicaGroupPartitionConfig.getNumInstances();
+    if (numInstancesToSelect > 0) {
+      Preconditions.checkState(numInstancesToSelect <= numInstances,
+          "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstances,
+          numInstancesToSelect);
+    } else {
+      numInstancesToSelect = numInstances;
+    }
 
-        // If there is any new replica group added, choose pool which is least frequently picked up.
-        // Use a min heap to track the least frequently picked pool among all the pools.
-        PriorityQueue<Pairs.IntPair> minHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator());
-        for (int pool : pools) {
-          int numExistingReplicaGroups =
-              poolToReplicaGroupIdsMap.get(pool) != null ? poolToReplicaGroupIdsMap.get(pool).size() : 0;
-          minHeap.add(new Pairs.IntPair(numExistingReplicaGroups, pool));
-        }
-        for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-          if (replicaGroupIdToPoolMap.containsKey(replicaId)) {
-            continue;
-          }
-          // Increment the frequency for a given pool and put it back to the min heap to rotate the pool selection.
-          Pairs.IntPair pair = minHeap.remove();
-          int pool = pair.getRight();
-          pair.setLeft(pair.getLeft() + 1);
-          minHeap.add(pair);
-          poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
-          replicaGroupIdToPoolMap.put(replicaId, pool);
-        }
-      } else {
-        // Current default way to assign pool to replica groups.
-        for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-          // Pick one pool for each replica-group based on the table name hash
-          int pool = pools.get((tableNameHash + replicaId) % numPools);
-          poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
-          replicaGroupIdToPoolMap.put(replicaId, pool);
-        }
-      }
-      LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap,
-          _tableNameWithType);
-
-      int numInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
-      if (numInstancesPerReplicaGroup > 0) {
-        // Check if we have enough instances if number of instances per replica-group is configured
-        for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
-          int pool = entry.getKey();
-          int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size();
-          int numInstancesToSelect = numInstancesPerReplicaGroup * entry.getValue().size();
-          Preconditions.checkState(numInstancesToSelect <= numInstancesInPool,
-              "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstancesInPool,
-              numInstancesToSelect);
-        }
-      } else {
-        // Use as many instances as possible if number of instances per replica-group is not configured
-        numInstancesPerReplicaGroup = Integer.MAX_VALUE;
-        for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
-          int pool = entry.getKey();
-          int numReplicaGroupsInPool = entry.getValue().size();
-          int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size();
-          Preconditions.checkState(numReplicaGroupsInPool <= numInstancesInPool,
-              "Not enough qualified instances from pool: %s, cannot select %s replica-groups from %s instances", pool,
-              numReplicaGroupsInPool, numInstancesInPool);
-          numInstancesPerReplicaGroup =
-              Math.min(numInstancesPerReplicaGroup, numInstancesInPool / numReplicaGroupsInPool);
-        }
+    List<String> instancesToSelect;
+    if (_minimizeDataMovement) {
+      List<String> existingInstances = _existingInstancePartitions.getInstances(0, 0);
+      LinkedHashSet<String> candidateInstances = Sets.newLinkedHashSetWithExpectedSize(instanceConfigs.size());
+      instanceConfigs.forEach(k -> candidateInstances.add(k.getInstanceName()));
+      instancesToSelect =
+          selectInstancesWithMinimumMovement(numInstancesToSelect, candidateInstances, existingInstances);
+      LOGGER.info("Selecting instances: {} for table: {}, existing instances: {}", instancesToSelect,
+          _tableNameWithType, existingInstances);
+    } else {
+      instancesToSelect = new ArrayList<>(numInstancesToSelect);
+      for (int i = 0; i < numInstancesToSelect; i++) {
+        instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
       }
-      LOGGER.info("Selecting {} instances per replica-group for table: {}", numInstancesPerReplicaGroup,
-          _tableNameWithType);
+      LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, _tableNameWithType);
+    }
+    // Set the instances as partition 0 replica 0
+    instancePartitions.setInstances(0, 0, instancesToSelect);
+  }
+
+  /**
+   * Selects the instances with minimum movement.
+   * For each instance in the existing instances, if it is still alive, keep it in the same position. Then fill the
+   * vacant positions with the remaining candidate instances.
+   * NOTE: This method will modify the candidate instances.
+   */
+  private static List<String> selectInstancesWithMinimumMovement(int numInstancesToSelect,
+      LinkedHashSet<String> candidateInstances, List<String> existingInstances) {
+    // Initialize the list with empty positions to fill
+    List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
+    for (int i = 0; i < numInstancesToSelect; i++) {
+      instancesToSelect.add(null);
+    }
 
-      // Assign instances within a replica-group to one partition if not configured
-      int numPartitions = _replicaGroupPartitionConfig.getNumPartitions();
-      if (numPartitions <= 0) {
-        numPartitions = 1;
+    // Keep the existing instances that are still alive
+    int numInstancesToCheck = Math.min(numInstancesToSelect, existingInstances.size());
+    for (int i = 0; i < numInstancesToCheck; i++) {
+      String existingInstance = existingInstances.get(i);
+      if (candidateInstances.remove(existingInstance)) {
+        instancesToSelect.set(i, existingInstance);
       }
-      // Assign all instances within a replica-group to each partition if not configured
-      int numInstancesPerPartition = _replicaGroupPartitionConfig.getNumInstancesPerPartition();
-      if (numInstancesPerPartition > 0) {
-        Preconditions.checkState(numInstancesPerPartition <= numInstancesPerReplicaGroup,
-            "Number of instances per partition: %s must be smaller or equal to number of instances per replica-group:"
-                + " %s", numInstancesPerPartition, numInstancesPerReplicaGroup);
-      } else {
-        numInstancesPerPartition = numInstancesPerReplicaGroup;
+    }
+
+    // Fill the vacant positions with the remaining candidate instances
+    Iterator<String> iterator = candidateInstances.iterator();
+    for (int i = 0; i < numInstancesToSelect; i++) {
+      if (instancesToSelect.get(i) == null) {
+        instancesToSelect.set(i, iterator.next());
       }
-      LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}",
-          numPartitions, numInstancesPerPartition, _tableNameWithType);
-
-      if (_minimizeDataMovement && _existingInstancePartitions != null) {
-        // Minimize data movement.
-        int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
-        int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
-        int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);
-
-        existingReplicaGroupIdToExistingInstancesMap = new TreeMap<>();
-        // Step 1: find out the replica groups and their existing instances,
-        //   so that these instances can be filtered out and won't be chosen for the other replica group.
-        for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) {
-          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
-          if (pool == null) {
-            // Skip the replica group if it's no longer needed.
-            continue;
-          }
+    }
 
-          for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
-            List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
-            existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>())
-                .addAll(existingInstances);
-          }
-        }
+    return instancesToSelect;
+  }
 
-        for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) {
-          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
-          // Step 2: filter out instances that belong to other replica groups which should not be the candidate.
-          LinkedHashSet<String> candidateInstances = new LinkedHashSet<>(poolToCandidateInstancesMap.get(pool));
-          for (int otherReplicaGroupId = 0;
-              otherReplicaGroupId < existingNumReplicaGroups && otherReplicaGroupId < numReplicaGroups;
-              otherReplicaGroupId++) {
-            if (replicaGroupId != otherReplicaGroupId) {
-              candidateInstances.removeAll(existingReplicaGroupIdToExistingInstancesMap.get(otherReplicaGroupId));
-            }
-          }
-          LinkedHashSet<String> chosenCandidateInstances = new LinkedHashSet<>();
-          for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
-            List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
-            // Step 3: figure out the missing instances and the new instances to fill their vacant positions.
-            List<String> instancesToSelect =
-                getInstancesWithMinimumMovement(numInstancesPerPartition, candidateInstances, existingInstances);
-            chosenCandidateInstances.addAll(instancesToSelect);
-            instancePartitions.setInstances(partitionId, replicaGroupId, instancesToSelect);
-          }
-          // Remove instances that are already been chosen.
-          poolToCandidateInstancesMap.get(pool).removeAll(chosenCandidateInstances);
-        }
+  private void replicaGroupBasedSimple(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap,
+      InstancePartitions instancePartitions, List<Integer> pools, int tableNameHash) {
+    int numPools = pools.size();
+    int numReplicaGroups = getNumReplicaGroups();
 
-        // If the new number of replica groups is greater than the existing number of replica groups.
-        for (int replicaGroupId = existingNumReplicaGroups; replicaGroupId < numReplicaGroups; replicaGroupId++) {
-          int pool = replicaGroupIdToPoolMap.get(replicaGroupId);
-          LinkedHashSet<String> candidateInstances = new LinkedHashSet<>(poolToCandidateInstancesMap.get(pool));
-
-          Set<String> chosenCandidateInstances = new HashSet<>();
-          for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
-            List<String> existingInstances = Collections.emptyList();
-            List<String> instancesToSelect =
-                getInstancesWithMinimumMovement(numInstancesPerPartition, candidateInstances, existingInstances);
-            chosenCandidateInstances.addAll(instancesToSelect);
-            instancePartitions.setInstances(partitionId, replicaGroupId, instancesToSelect);
-          }
-          // Remove instances that are already been chosen.
-          poolToCandidateInstancesMap.get(pool).removeAll(chosenCandidateInstances);
-        }
-      } else {
-        // Pick instances based on the sorted list of instance names.
-        String[][] replicaGroupIdToInstancesMap = new String[numReplicaGroups][numInstancesPerReplicaGroup];
-        for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
-          List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(entry.getKey());
-          List<Integer> replicaGroupIdsInPool = entry.getValue();
-
-          // Use round-robin to assign instances to each replica-group so that they get instances with similar picking
-          // priority
-          // E.g. (within a pool, 10 instances, 2 replica-groups, 3 instances per replica-group)
-          // [i0, i1, i2, i3, i4, i5, i6, i7, i8, i9]
-          //  r0  r1  r0  r1  r0  r1
-          int instanceIdInPool = 0;
-          for (int instanceIdInReplicaGroup = 0; instanceIdInReplicaGroup < numInstancesPerReplicaGroup;
-              instanceIdInReplicaGroup++) {
-            for (int replicaGroupId : replicaGroupIdsInPool) {
-              replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup] =
-                  instanceConfigsInPool.get(instanceIdInPool++).getInstanceName();
-            }
-          }
+    // Pick one pool for each replica-group based on the table name hash
+    Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
+    int startIndex = Math.abs(tableNameHash % numPools);
+    for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
+      int pool = pools.get((startIndex + replicaGroupId) % numPools);
+      poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaGroupId);
+    }
+    LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap,
+        _tableNameWithType);
+
+    int numInstancesPerReplicaGroup =
+        getNumInstancesPerReplicaGroup(poolToInstanceConfigsMap, poolToReplicaGroupIdsMap);
+    LOGGER.info("Selecting {} instances per replica-group for table: {}", numInstancesPerReplicaGroup,
+        _tableNameWithType);
+    int numPartitions = getNumPartitions();
+    int numInstancesPerPartition = getNumInstancesPerPartition(numInstancesPerReplicaGroup);
+    LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}",
+        numPartitions, numInstancesPerPartition, _tableNameWithType);
+
+    // Pick instances based on the sorted list of instance names
+    String[][] replicaGroupIdToInstancesMap = new String[numReplicaGroups][numInstancesPerReplicaGroup];
+    for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
+      List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(entry.getKey());
+      List<Integer> replicaGroupIdsInPool = entry.getValue();
+
+      // Use round-robin to assign instances to each replica-group so that they get instances with similar picking
+      // priority
+      // E.g. (within a pool, 10 instances, 2 replica-groups, 3 instances per replica-group)
+      // [i0, i1, i2, i3, i4, i5, i6, i7, i8, i9]
+      //  r0  r1  r0  r1  r0  r1
+      int instanceIdInPool = 0;
+      for (int instanceIdInReplicaGroup = 0; instanceIdInReplicaGroup < numInstancesPerReplicaGroup;
+          instanceIdInReplicaGroup++) {
+        for (int replicaGroupId : replicaGroupIdsInPool) {
+          replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup] =
+              instanceConfigsInPool.get(instanceIdInPool++).getInstanceName();
         }
+      }
+    }
 
-        // Assign consecutive instances within a replica-group to each partition.
-        // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances per partition)
-        // [i0, i1, i2, i3, i4]
-        //  p0  p0  p0  p1  p1
-        //  p1  p2  p2  p2
-        for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
-          int instanceIdInReplicaGroup = 0;
-          for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
-            List<String> instancesInPartition = new ArrayList<>(numInstancesPerPartition);
-            for (int instanceIdInPartition = 0; instanceIdInPartition < numInstancesPerPartition;
-                instanceIdInPartition++) {
-              instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
-              instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup;
-            }
-            LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}",
-                instancesInPartition, replicaGroupId, partitionId, _tableNameWithType);
-            instancePartitions.setInstances(partitionId, replicaGroupId, instancesInPartition);
-          }
+    // Assign consecutive instances within a replica-group to each partition
+    // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances per partition)
+    // [i0, i1, i2, i3, i4]
+    //  p0  p0  p0  p1  p1
+    //  p1  p2  p2  p2
+    for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
+      String[] instancesInReplicaGroup = replicaGroupIdToInstancesMap[replicaGroupId];
+      int instanceIdInReplicaGroup = 0;
+      for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+        List<String> instances = new ArrayList<>(numInstancesPerPartition);
+        for (int i = 0; i < numInstancesPerPartition; i++) {
+          instances.add(instancesInReplicaGroup[instanceIdInReplicaGroup]);
+          instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup;
         }
+        LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}", instances,
+            replicaGroupId, partitionId, _tableNameWithType);
+        instancePartitions.setInstances(partitionId, replicaGroupId, instances);
       }
-    } else {
-      // Non-replica-group based selection
-
-      // Pick one pool based on the table name hash
-      int pool = pools.get(tableNameHash % numPools);
-      LOGGER.info("Selecting pool: {} for table: {}", pool, _tableNameWithType);
-      List<InstanceConfig> instanceConfigs = poolToInstanceConfigsMap.get(pool);
-      int numInstanceConfigs = instanceConfigs.size();
-
-      // Assign all instances if not configured
-      int numInstancesToSelect = _replicaGroupPartitionConfig.getNumInstances();
-      if (numInstancesToSelect > 0) {
-        Preconditions.checkState(numInstancesToSelect <= numInstanceConfigs,
-            "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstanceConfigs,
+    }
+  }
+
+  private int getNumReplicaGroups() {
+    int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups();
+    Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive");
+    return numReplicaGroups;
+  }
+
+  private int getNumInstancesPerReplicaGroup(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap,
+      Map<Integer, List<Integer>> poolToReplicaGroupIdsMap) {
+    int numInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
+    if (numInstancesPerReplicaGroup > 0) {
+      // Check if we have enough instances if number of instances per replica-group is configured
+      for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
+        int pool = entry.getKey();
+        int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size();
+        int numInstancesToSelect = numInstancesPerReplicaGroup * entry.getValue().size();
+        Preconditions.checkState(numInstancesToSelect <= numInstancesInPool,
+            "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstancesInPool,
             numInstancesToSelect);
-      } else {
-        numInstancesToSelect = numInstanceConfigs;
       }
-
-      List<String> instancesToSelect;
-      if (_minimizeDataMovement && _existingInstancePartitions != null) {
-        // Minimize data movement.
-        List<String> existingInstances = _existingInstancePartitions.getInstances(0, 0);
-        LinkedHashSet<String> candidateInstances = new LinkedHashSet<>();
-        instanceConfigs.forEach(k -> candidateInstances.add(k.getInstanceName()));
-        instancesToSelect =
-            getInstancesWithMinimumMovement(numInstancesToSelect, candidateInstances, existingInstances);
-      } else {
-        // Select instances sequentially.
-        instancesToSelect = new ArrayList<>(numInstancesToSelect);
-        for (int i = 0; i < numInstancesToSelect; i++) {
-          instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
-        }
+    } else {
+      // Use as many instances as possible if number of instances per replica-group is not configured
+      numInstancesPerReplicaGroup = Integer.MAX_VALUE;
+      for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
+        int pool = entry.getKey();
+        int numReplicaGroupsInPool = entry.getValue().size();
+        int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size();
+        Preconditions.checkState(numReplicaGroupsInPool <= numInstancesInPool,
+            "Not enough qualified instances from pool: %s, cannot select %s replica-groups from %s instances", pool,
+            numReplicaGroupsInPool, numInstancesInPool);
+        numInstancesPerReplicaGroup =
+            Math.min(numInstancesPerReplicaGroup, numInstancesInPool / numReplicaGroupsInPool);
       }
-      LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, _tableNameWithType);
-      // Set the instances as partition 0 replica 0
-      instancePartitions.setInstances(0, 0, instancesToSelect);
     }
+    return numInstancesPerReplicaGroup;
   }
 
-  /**
-   * Select instances with minimum movement.
-   * This algorithm can solve the following scenarios:
-   *    * swap an instance
-   *    * add/remove replica groups
-   *    * increase/decrease number of instances per replica group
-   * TODO: handle the scenarios that selected pools are changed.
-   * TODO: improve the algorithm by doing the following steps:
-   *         1. assign the existing instances for all partitions;
-   *         2. assign the vacant positions based on the partitions already assigned to each instance.
-   * @param numInstancesToSelect number of instances to select
-   * @param candidateInstances candidate instances to be selected
-   * @param existingInstances list of existing instances
-   */
-  private static List<String> getInstancesWithMinimumMovement(int numInstancesToSelect,
-      LinkedHashSet<String> candidateInstances, List<String> existingInstances) {
-    // Initialize the list with empty positions to fill.
-    List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
-    for (int i = 0; i < numInstancesToSelect; i++) {
-      instancesToSelect.add(null);
+  private int getNumPartitions() {
+    // Assign instances within a replica-group to one partition if not configured
+    int numPartitions = _replicaGroupPartitionConfig.getNumPartitions();
+    if (numPartitions <= 0) {
+      numPartitions = 1;
     }
-    Deque<String> newlyAddedInstances = new LinkedList<>();
+    return numPartitions;
+  }
+
+  private int getNumInstancesPerPartition(int numInstancesPerReplicaGroup) {
+    // Assign all instances within a replica-group to each partition if not configured
+    int numInstancesPerPartition = _replicaGroupPartitionConfig.getNumInstancesPerPartition();
+    if (numInstancesPerPartition > 0) {
+      Preconditions.checkState(numInstancesPerPartition <= numInstancesPerReplicaGroup,
+          "Number of instances per partition: %s must be smaller or equal to number of instances per replica-group: %s",
+          numInstancesPerPartition, numInstancesPerReplicaGroup);
+    } else {
+      numInstancesPerPartition = numInstancesPerReplicaGroup;
+    }
+    return numInstancesPerPartition;
+  }
+
+  private void replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap,
+      InstancePartitions instancePartitions, List<Integer> pools, int tableNameHash) {
+    int numPools = pools.size();
+    int numReplicaGroups = getNumReplicaGroups();
 
-    // Find out the existing instances that are still alive.
-    Set<String> existingInstancesStillAlive = new HashSet<>();
-    for (String existingInstance : existingInstances) {
-      if (candidateInstances.contains(existingInstance)) {
-        existingInstancesStillAlive.add(existingInstance);
+    Map<String, Integer> instanceToPoolMap = new HashMap<>();
+    for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+      int pool = entry.getKey();
+      for (InstanceConfig instanceConfig : entry.getValue()) {
+        instanceToPoolMap.put(instanceConfig.getInstanceName(), pool);
       }
     }
 
-    // Find out the newly added instances.
-    for (String candidateInstance : candidateInstances) {
-      if (!existingInstancesStillAlive.contains(candidateInstance)) {
-        newlyAddedInstances.add(candidateInstance);
+    // Calculate the mapping from pool to replica-groups assigned to the pool
+    List<Set<String>> replicaGroupIdToExistingInstancesMap = new ArrayList<>(numReplicaGroups);
+    Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
+    int maxReplicaGroupsPerPool = (numReplicaGroups + numPools - 1) / numPools;
+    int startIndex = Math.abs(tableNameHash % numPools);
+
+    int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+    int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+    for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
+      // For each replica-group, gather number of existing instances within each pool
+      Set<String> existingInstanceSet = new HashSet<>();
+      replicaGroupIdToExistingInstancesMap.add(existingInstanceSet);
+      Map<Integer, Integer> poolToNumExistingInstancesMap = new TreeMap<>();
+      if (replicaGroupId < existingNumReplicaGroups) {
+        for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
+          List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+          existingInstanceSet.addAll(existingInstances);
+          for (String existingInstance : existingInstances) {
+            Integer existingPool = instanceToPoolMap.get(existingInstance);
+            if (existingPool != null) {
+              poolToNumExistingInstancesMap.merge(existingPool, 1, Integer::sum);
+            }
+          }
+        }
+      }
+      // Sort the pools based on the number of existing instances in the pool in descending order, then use the table
+      // name hash to break even
+      // Triple stores (pool, numExistingInstances, poolIndex) for sorting
+      List<Triple<Integer, Integer, Integer>> triples = new ArrayList<>(numPools);
+      for (int i = 0; i < numPools; i++) {
+        int pool = pools.get((startIndex + replicaGroupId + i) % numPools);
+        triples.add(Triple.of(pool, poolToNumExistingInstancesMap.getOrDefault(pool, 0), i));
+      }
+      triples.sort((o1, o2) -> {
+        int result = Integer.compare(o2.getMiddle(), o1.getMiddle());
+        return result != 0 ? result : Integer.compare(o1.getRight(), o2.getRight());
+      });
+      for (Triple<Integer, Integer, Integer> triple : triples) {
+        int pool = triple.getLeft();
+        List<Integer> replicaGroupIds = poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>());
+        if (replicaGroupIds.size() < maxReplicaGroupsPerPool) {
+          replicaGroupIds.add(replicaGroupId);
+          break;
+        }
       }
     }
+    LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap,
+        _tableNameWithType);
 
-    int numExistingInstances = existingInstances.size();
-    for (int i = 0; i < numInstancesToSelect; i++) {
-      String existingInstance = i < numExistingInstances ? existingInstances.get(i) : null;
-      String selectedInstance;
-      if (existingInstance != null && candidateInstances.contains(existingInstance)) {
-        selectedInstance = existingInstance;
-        existingInstancesStillAlive.remove(selectedInstance);
-      } else {
-        selectedInstance = newlyAddedInstances.poll();
+    int numInstancesPerReplicaGroup =
+        getNumInstancesPerReplicaGroup(poolToInstanceConfigsMap, poolToReplicaGroupIdsMap);
+    LOGGER.info("Selecting {} instances per replica-group for table: {}", numInstancesPerReplicaGroup,
+        _tableNameWithType);
+    int numPartitions = getNumPartitions();
+    int numInstancesPerPartition = getNumInstancesPerPartition(numInstancesPerReplicaGroup);
+    LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}",
+        numPartitions, numInstancesPerPartition, _tableNameWithType);
+
+    List<List<String>> replicaGroupIdToInstancesMap = new ArrayList<>(numReplicaGroups);
+    for (int i = 0; i < numReplicaGroups; i++) {
+      replicaGroupIdToInstancesMap.add(new ArrayList<>(numInstancesPerReplicaGroup));
+    }
+    for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
+      // For each pool, keep the existing instances that are still alive within each replica-group
+      int pool = entry.getKey();
+      List<Integer> replicaGroupIds = entry.getValue();
+      List<String> newInstances = new ArrayList<>();
+      for (InstanceConfig instanceConfig : poolToInstanceConfigsMap.get(pool)) {
+        String instanceName = instanceConfig.getInstanceName();
+        boolean isExistingInstance = false;
+        for (int replicaGroupId : replicaGroupIds) {
+          List<String> instances = replicaGroupIdToInstancesMap.get(replicaGroupId);
+          if (instances.size() == numInstancesPerReplicaGroup) {
+            continue;
+          }
+          if (replicaGroupIdToExistingInstancesMap.get(replicaGroupId).contains(instanceName)) {
+            instances.add(instanceName);
+            isExistingInstance = true;
+            break;
+          }
+        }
+        if (!isExistingInstance) {
+          newInstances.add(instanceName);
+        }
       }
-      instancesToSelect.set(i, selectedInstance);
-      // If it's an existing alive instance, or it's for a new replica group, add the new instance to the tail,
-      // so that it won't be firstly chosen for the next partition.
-      // For newly added instances to fill the existing replica group, the sequence cannot change;
-      // otherwise there is no guarantee that same vacant position will be filled with the same new instance.
-      // The 'selectedInstance' object can still be null if there is no new instances from the candidate list.
-      if (selectedInstance != null && (i < numExistingInstances || existingInstances.isEmpty())) {
-        candidateInstances.remove(selectedInstance);
-        candidateInstances.add(selectedInstance);
+      // Fill the vacant positions with the new instances. First fill the replica groups with the least instances, then
+      // use round-robin to assign instances to each replica-group so that they get instances with similar picking
+      // priority.
+      int numInstancesToFill = numInstancesPerReplicaGroup * replicaGroupIds.size();
+      for (int replicaGroupId : replicaGroupIds) {
+        numInstancesToFill -= replicaGroupIdToInstancesMap.get(replicaGroupId).size();
+      }
+      for (int i = 0; i < numInstancesToFill; i++) {
+        int leastNumInstances = Integer.MAX_VALUE;
+        int replicaGroupIdWithLeastInstances = -1;
+        for (int replicaGroupId : replicaGroupIds) {
+          int numInstances = replicaGroupIdToInstancesMap.get(replicaGroupId).size();
+          if (numInstances < leastNumInstances) {
+            leastNumInstances = numInstances;
+            replicaGroupIdWithLeastInstances = replicaGroupId;
+          }
+        }
+        replicaGroupIdToInstancesMap.get(replicaGroupIdWithLeastInstances).add(newInstances.get(i));
       }
     }
 
-    // If there are still some vacant positions in the instance list,
-    // try to fill with instances which are either left over or newly added.
-    for (int i = 0; i < instancesToSelect.size(); i++) {
-      if (instancesToSelect.get(i) == null) {
-        if (!existingInstancesStillAlive.isEmpty()) {
-          Iterator<String> iterator = existingInstancesStillAlive.iterator();
-          String existingInstanceLeftOver = iterator.next();
-          instancesToSelect.set(i, existingInstanceLeftOver);
-          iterator.remove();
-        } else if (!newlyAddedInstances.isEmpty()) {
-          // pick a new instance to fill its vacant position.
-          String newInstance = newlyAddedInstances.pollFirst();
-          instancesToSelect.set(i, newInstance);
+    if (numPartitions == 1) {
+      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
+        List<String> instancesInReplicaGroup = replicaGroupIdToInstancesMap.get(replicaGroupId);
+        if (replicaGroupId < existingNumReplicaGroups) {
+          List<String> existingInstances = _existingInstancePartitions.getInstances(0, replicaGroupId);
+          LinkedHashSet<String> candidateInstances = new LinkedHashSet<>(instancesInReplicaGroup);
+          List<String> instances =
+              selectInstancesWithMinimumMovement(numInstancesPerReplicaGroup, candidateInstances, existingInstances);
+          LOGGER.info(
+              "Selecting instances: {} for replica-group: {}, partition: 0 for table: {}, existing instances: {}",
+              instances, replicaGroupId, _tableNameWithType, existingInstances);
+          instancePartitions.setInstances(0, replicaGroupId, instances);
+        } else {
+          LOGGER.info("Selecting instances: {} for replica-group: {}, partition: 0 for table: {}, "
+              + "there is no existing instances", instancesInReplicaGroup, replicaGroupId, _tableNameWithType);
+          instancePartitions.setInstances(0, replicaGroupId, instancesInReplicaGroup);
+        }
+      }
+    } else {
+      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
+        List<String> instancesInReplicaGroup = replicaGroupIdToInstancesMap.get(replicaGroupId);
+        if (replicaGroupId < existingNumReplicaGroups) {
+          int maxNumPartitionsPerInstance = (numInstancesPerReplicaGroup + numPartitions - 1) / numPartitions;
+          Map<String, Integer> instanceToNumPartitionsMap =
+              Maps.newHashMapWithExpectedSize(numInstancesPerReplicaGroup);
+          for (String instance : instancesInReplicaGroup) {
+            instanceToNumPartitionsMap.put(instance, 0);
+          }
+
+          List<List<String>> partitionIdToInstancesMap = new ArrayList<>(numPartitions);
+          List<Set<String>> partitionIdToInstanceSetMap = new ArrayList<>(numPartitions);
+          List<List<String>> partitionIdToExistingInstancesMap = new ArrayList<>(existingNumPartitions);
+          for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+            // Initialize the list with empty positions to fill
+            List<String> instances = new ArrayList<>(numInstancesPerPartition);
+            for (int i = 0; i < numInstancesPerPartition; i++) {
+              instances.add(null);
+            }
+            partitionIdToInstancesMap.add(instances);
+            Set<String> instanceSet = Sets.newHashSetWithExpectedSize(numInstancesPerPartition);
+            partitionIdToInstanceSetMap.add(instanceSet);
+
+            // Keep the existing instances that are still alive
+            if (partitionId < existingNumPartitions) {
+              List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+              partitionIdToExistingInstancesMap.add(existingInstances);
+              int numInstancesToCheck = Math.min(numInstancesPerPartition, existingInstances.size());
+              for (int i = 0; i < numInstancesToCheck; i++) {
+                String existingInstance = existingInstances.get(i);
+                Integer numPartitionsOnInstance = instanceToNumPartitionsMap.get(existingInstance);
+                if (numPartitionsOnInstance != null && numPartitionsOnInstance < maxNumPartitionsPerInstance) {
+                  instances.set(i, existingInstance);
+                  instanceSet.add(existingInstance);
+                  instanceToNumPartitionsMap.put(existingInstance, numPartitionsOnInstance + 1);
+                }
+              }
+            }
+          }
+
+          // Fill the vacant positions with instance that serves the least partitions
+          for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+            List<String> instances = partitionIdToInstancesMap.get(partitionId);
+            Set<String> instanceSet = partitionIdToInstanceSetMap.get(partitionId);
+            int numInstancesToFill = numInstancesPerPartition - instanceSet.size();
+            if (numInstancesToFill > 0) {
+              // Triple stores (instance, numPartitionsOnInstance, instanceIndex) for sorting
+              List<Triple<String, Integer, Integer>> triples = new ArrayList<>(numInstancesPerReplicaGroup);
+              for (int i = 0; i < numInstancesPerReplicaGroup; i++) {
+                String instance = instancesInReplicaGroup.get(i);
+                if (!instanceSet.contains(instance)) {
+                  triples.add(Triple.of(instance, instanceToNumPartitionsMap.get(instance), i));
+                }
+              }
+              triples.sort((o1, o2) -> {
+                int result = Integer.compare(o1.getMiddle(), o2.getMiddle());
+                return result != 0 ? result : Integer.compare(o1.getRight(), o2.getRight());
+              });
+              int instanceIdToFill = 0;
+              for (int i = 0; i < numInstancesPerPartition; i++) {
+                if (instances.get(i) == null) {
+                  String instance = triples.get(instanceIdToFill++).getLeft();
+                  instances.set(i, instance);
+                  instanceToNumPartitionsMap.put(instance, instanceToNumPartitionsMap.get(instance) + 1);
+                }
+              }
+            }
+
+            if (partitionId < existingNumPartitions) {
+              LOGGER.info(
+                  "Selecting instances: {} for replica-group: {}, partition: {} for table: {}, existing instances: {}",
+                  instances, replicaGroupId, partitionId, _tableNameWithType,
+                  partitionIdToExistingInstancesMap.get(partitionId));
+            } else {
+              LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}, "
+                  + "there is no existing instances", instances, replicaGroupId, partitionId, _tableNameWithType);
+            }
+            instancePartitions.setInstances(partitionId, replicaGroupId, instances);
+          }
+        } else {
+          // Assign consecutive instances within a replica-group to each partition
+          int instanceIdInReplicaGroup = 0;
+          for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+            List<String> instances = new ArrayList<>(numInstancesPerPartition);
+            for (int i = 0; i < numInstancesPerPartition; i++) {
+              instances.add(instancesInReplicaGroup.get(instanceIdInReplicaGroup));
+              instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup;
+            }
+            LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}, "
+                + "there is no existing instances", instances, replicaGroupId, partitionId, _tableNameWithType);
+            instancePartitions.setInstances(partitionId, replicaGroupId, instances);
+          }
         }
       }
     }
-    return instancesToSelect;
   }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
index 940968432b..28d58bbbcd 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
@@ -22,18 +22,17 @@ import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.TreeMap;
 import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.utils.config.InstanceUtils;
 import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
-import org.apache.pinot.spi.utils.Pairs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,16 +45,14 @@ public class InstanceTagPoolSelector {
 
   private final InstanceTagPoolConfig _tagPoolConfig;
   private final String _tableNameWithType;
-
   private final boolean _minimizeDataMovement;
-
   private final InstancePartitions _existingInstancePartitions;
 
   public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType,
       boolean minimizeDataMovement, @Nullable InstancePartitions existingInstancePartitions) {
     _tagPoolConfig = tagPoolConfig;
     _tableNameWithType = tableNameWithType;
-    _minimizeDataMovement = minimizeDataMovement;
+    _minimizeDataMovement = minimizeDataMovement && existingInstancePartitions != null;
     _existingInstancePartitions = existingInstancePartitions;
   }
 
@@ -104,7 +101,7 @@ public class InstanceTagPoolSelector {
       // Calculate the pools to select based on the selection config
       Set<Integer> pools = poolToInstanceConfigsMap.keySet();
       List<Integer> poolsToSelect = _tagPoolConfig.getPools();
-      if (poolsToSelect != null && !poolsToSelect.isEmpty()) {
+      if (!CollectionUtils.isEmpty(poolsToSelect)) {
         Preconditions.checkState(pools.containsAll(poolsToSelect), "Cannot find all instance pools configured: %s",
             poolsToSelect);
       } else {
@@ -123,45 +120,44 @@ public class InstanceTagPoolSelector {
           return poolToInstanceConfigsMap;
         }
 
+        // Select pools based on the table name hash to evenly distribute the tables
+        List<Integer> poolsInCluster = new ArrayList<>(pools);
+        int startIndex = Math.abs(tableNameHash % numPools);
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        if (_minimizeDataMovement && _existingInstancePartitions != null) {
-          Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>();
-          // Keep the same pool if it's already been used for the table.
+        if (_minimizeDataMovement) {
+          assert _existingInstancePartitions != null;
+          Map<Integer, Integer> poolToNumExistingInstancesMap = new TreeMap<>();
           int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
           int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
-          for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
-            for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
+          for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
+            for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
               List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
               for (String existingInstance : existingInstances) {
                 Integer existingPool = instanceToPoolMap.get(existingInstance);
                 if (existingPool != null) {
-                  if (!existingPoolsToExistingInstancesMap.containsKey(existingPool)) {
-                    existingPoolsToExistingInstancesMap.put(existingPool, new HashSet<>());
-                  }
-                  existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new HashSet<>())
-                      .add(existingInstance);
+                  poolToNumExistingInstancesMap.merge(existingPool, 1, Integer::sum);
                 }
               }
             }
           }
-
-          // Use a max heap to track the number of servers used for all the pools.
-          PriorityQueue<Pairs.IntPair> maxHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator(false));
-          for (int pool : pools) {
-            maxHeap.add(new Pairs.IntPair(existingPoolsToExistingInstancesMap.get(pool).size(), pool));
+          // Sort the pools based on the number of existing instances in the pool in descending order, then use the
+          // table name hash to break even
+          // Triple stores (pool, numExistingInstances, poolIndex) for sorting
+          List<Triple<Integer, Integer, Integer>> triples = new ArrayList<>(numPools);
+          for (int i = 0; i < numPools; i++) {
+            int pool = poolsInCluster.get((startIndex + i) % numPools);
+            triples.add(Triple.of(pool, poolToNumExistingInstancesMap.getOrDefault(pool, 0), i));
           }
-
-          // Pick the pools from the max heap, so that data movement be minimized.
+          triples.sort((o1, o2) -> {
+            int result = Integer.compare(o2.getMiddle(), o1.getMiddle());
+            return result != 0 ? result : Integer.compare(o1.getRight(), o2.getRight());
+          });
           for (int i = 0; i < numPoolsToSelect; i++) {
-            Pairs.IntPair pair = maxHeap.remove();
-            poolsToSelect.add(pair.getRight());
+            poolsToSelect.add(triples.get(i).getLeft());
           }
-          LOGGER.info("The selected pools: " + poolsToSelect);
         } else {
-          // Select pools based on the table name hash to evenly distribute the tables
-          List<Integer> poolsInCluster = new ArrayList<>(pools);
           for (int i = 0; i < numPoolsToSelect; i++) {
-            poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+            poolsToSelect.add(poolsInCluster.get((startIndex + i) % numPools));
           }
         }
       }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index a6220c00a2..113d4e1649 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -50,7 +50,9 @@ import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.fail;
 
 
 public class InstanceAssignmentTest {
@@ -198,8 +200,8 @@ public class InstanceAssignmentTest {
     // r0: [i8, i1, i4]
     //      p0, p0, p1
     //      p1
-    // r1: [i9, i10, i5]
-    //      p0, p0, p1
+    // r1: [i9, i5, i10]
+    //      p0, p1, p0
     //      p1
     // r2: [i0, i3, i11]
     //      p0, p0, p1
@@ -217,7 +219,7 @@ public class InstanceAssignmentTest {
     assertEquals(instancePartitions.getInstances(1, 2),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 0));
 
-    // Add 2 more instances to the ZK and increase the number of instances per replica group from 2 to 3.
+    // Add 2 more instances to the ZK and increase the number of instances per partition from 2 to 3.
     for (int i = numInstances + 2; i < numInstances + 4; i++) {
       InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
       instanceConfig.addTag(OFFLINE_TAG);
@@ -233,34 +235,29 @@ public class InstanceAssignmentTest {
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 12 = 2
     // [i10, i11, i12, i13, i3, i4, i5, i11, i7, i8, i9, i0, i1]
-    // For r0, the candidate instances are [i12, i13, i4, i7, i8, i1].
-    //   For p0, since the existing assignment is [i8, i1], the next available instance from the candidates is i12.
-    //   For p1, the existing assignment is [i4, i8], the next available instance is also i12.
-    // r0: [i12, i4, i8, i1]
-    // For r1, the candidate instances become [i10, i13, i5, i7, i9].
-    //   For p0, since the existing assignment is [i9, i10], the next available instance is i13 (new instance).
-    //   For p1, the existing assignment is [i5, i9], the next available one from the candidates is i10, but since
-    //   i10 is already used in the former partition, it got added to the tail, so the next available one is i13.
-    // r1: [i10, i13, i5, i9]
-    // For r2, the candidate instances become [i11, i3, i7, i0].
-    //   For p0, the existing assignment is [i0, i3], the next available instance from the candidates is i11.
-    //   For p1, the existing assignment is [i11, i0], the next available instance from the candidates is i3, but
-    //   since i3 is already used in the former partition, it got appended to the tail, so the next available one is i7.
-    // r2: [i11, i3, i7, i0]
+    // r0: [i8, i1, i4, i12]
+    //      p0, p0, p1, p0
+    //      p1, p1
+    // r1: [i9, i5, i10, i13]
+    //      p0, p1, p0,  p0
+    //      p1,     p1
+    // r2: [i0, i3, i11, i7]
+    //      p0, p0, p1,  p0
+    //      p1, p1
     assertEquals(instancePartitions.getInstances(0, 0),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 12));
     assertEquals(instancePartitions.getInstances(1, 0),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 12));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 1));
     assertEquals(instancePartitions.getInstances(0, 1),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 13));
     assertEquals(instancePartitions.getInstances(1, 1),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 13));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 10));
     assertEquals(instancePartitions.getInstances(0, 2),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 11));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 7));
     assertEquals(instancePartitions.getInstances(1, 2),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 7));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3));
 
-    // Reduce the number of instances per replica group from 3 to 2.
+    // Reduce the number of instances per partition from 3 to 2.
     numInstancesPerPartition = 2;
     tableConfig.getValidationConfig()
         .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(partitionColumnName, numInstancesPerPartition));
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
index fdb6292f26..288b789aee 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
@@ -31,36 +31,66 @@ import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+
 
 public class InstanceReplicaGroupPartitionSelectorTest {
 
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
+  //@formatter:off
   private static final String INSTANCE_CONFIG_TEMPLATE =
-      "{\n" + "  \"id\": \"Server_pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
-          + "  \"simpleFields\": {\n" + "    \"HELIX_ENABLED\": \"true\",\n"
-          + "    \"HELIX_ENABLED_TIMESTAMP\": \"1688959934305\",\n"
-          + "    \"HELIX_HOST\": \"pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local\",\n"
-          + "    \"HELIX_PORT\": \"8098\",\n" + "    \"adminPort\": \"8097\",\n" + "    \"grpcPort\": \"8090\",\n"
-          + "    \"queryMailboxPort\": \"46347\",\n" + "    \"queryServerPort\": \"45031\",\n"
-          + "    \"shutdownInProgress\": \"false\"\n" + "  },\n" + "  \"mapFields\": {\n"
-          + "    \"SYSTEM_RESOURCE_INFO\": {\n" + "      \"numCores\": \"16\",\n"
-          + "      \"totalMemoryMB\": \"126976\",\n" + "      \"maxHeapSizeMB\": \"65536\"\n" + "    },\n"
-          + "    \"pool\": {\n" + "      \"DefaultTenant_OFFLINE\": \"${pool}\",\n"
-          + "      \"${poolName}\": \"${pool}\",\n" + "      \"AllReplicationGroups\": \"1\"\n" + "    }\n" + "  },\n"
-          + "  \"listFields\": {\n" + "    \"TAG_LIST\": [\n" + "      \"DefaultTenant_OFFLINE\",\n"
-          + "      \"DefaultTenant_REALTIME\",\n" + "      \"${poolName}\",\n" + "      \"AllReplicationGroups\"\n"
-          + "    ]\n" + "  }\n" + "}";
+      "{\n"
+    + "  \"id\": \"Server_pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+    + "  \"simpleFields\": {\n"
+    + "    \"HELIX_ENABLED\": \"true\",\n"
+    + "    \"HELIX_ENABLED_TIMESTAMP\": \"1688959934305\",\n"
+    + "    \"HELIX_HOST\": \"pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local\",\n"
+    + "    \"HELIX_PORT\": \"8098\",\n"
+    + "    \"adminPort\": \"8097\",\n"
+    + "    \"grpcPort\": \"8090\",\n"
+    + "    \"queryMailboxPort\": \"46347\",\n"
+    + "    \"queryServerPort\": \"45031\",\n"
+    + "    \"shutdownInProgress\": \"false\"\n"
+    + "  },\n"
+    + "  \"mapFields\": {\n"
+    + "    \"SYSTEM_RESOURCE_INFO\": {\n"
+    + "      \"numCores\": \"16\",\n"
+    + "      \"totalMemoryMB\": \"126976\",\n"
+    + "      \"maxHeapSizeMB\": \"65536\"\n"
+    + "    },\n"
+    + "    \"pool\": {\n"
+    + "      \"DefaultTenant_OFFLINE\": \"${pool}\",\n"
+    + "      \"${poolName}\": \"${pool}\",\n"
+    + "      \"AllReplicationGroups\": \"1\"\n"
+    + "    }\n"
+    + "  },\n"
+    + "  \"listFields\": {\n"
+    + "    \"TAG_LIST\": [\n"
+    + "      \"DefaultTenant_OFFLINE\",\n"
+    + "      \"DefaultTenant_REALTIME\",\n"
+    + "      \"${poolName}\",\n"
+    + "      \"AllReplicationGroups\"\n"
+    + "    ]\n"
+    + "  }\n"
+    + "}";
+  //@formatter:on
 
   @Test
   public void testPoolsWhenOneMorePoolAddedAndOneMoreReplicaGroupsNeeded()
       throws JsonProcessingException {
+    //@formatter:off
     String existingPartitionsJson =
-        "    {\n" + "      \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
-            + "      \"partitionToInstancesMap\": {\n" + "        \"0_0\": [\n"
-            + "          \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
-            + "          \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
-            + "        ]\n" + "      }\n" + "    }\n";
+        "{\n"
+      + "  \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+      + "  \"partitionToInstancesMap\": {\n"
+      + "    \"0_0\": [\n"
+      + "      \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+      + "      \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+      + "    ]\n"
+      + "  }\n"
+      + "}";
+    //@formatter:on
     InstancePartitions existing = OBJECT_MAPPER.readValue(existingPartitionsJson, InstancePartitions.class);
     InstanceReplicaGroupPartitionConfig config =
         new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null);
@@ -94,33 +124,47 @@ public class InstanceReplicaGroupPartitionSelectorTest {
 
     // Now that 1 more pool is added and 1 more RG is needed, a new set called "0_1" is generated,
     // and the instances from Pool 1 are assigned to this new replica.
+    //@formatter:off
     String expectedInstancePartitions =
-        "    {\n" + "      \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
-            + "      \"partitionToInstancesMap\": {\n" + "        \"0_0\": [\n"
-            + "          \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
-            + "          \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
-            + "        ],\n" + "        \"0_1\": [\n"
-            + "          \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
-            + "          \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
-            + "        ]\n" + "      }\n" + "  }\n";
+        "{\n"
+      + "  \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+      + "  \"partitionToInstancesMap\": {\n"
+      + "    \"0_0\": [\n"
+      + "      \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+      + "      \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+      + "    ],\n"
+      + "    \"0_1\": [\n"
+      + "      \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+      + "      \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+      + "    ]\n"
+      + "  }\n"
+      + "}";
+    //@formatter:on
     InstancePartitions expectedPartitions =
         OBJECT_MAPPER.readValue(expectedInstancePartitions, InstancePartitions.class);
-    assert assignedPartitions.equals(expectedPartitions);
+    assertEquals(assignedPartitions, expectedPartitions);
   }
 
   @Test
   public void testSelectPoolsWhenExistingReplicaGroupMapsToMultiplePools()
       throws JsonProcessingException {
     // The "rg0-2" instance used to belong to Pool 1, but now it belongs to Pool 0.
+    //@formatter:off
     String existingPartitionsJson =
-        "    {\n" + "      \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
-            + "      \"partitionToInstancesMap\": {\n" + "        \"0_0\": [\n"
-            + "          \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
-            + "          \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
-            + "        ],\n" + "        \"0_1\": [\n"
-            + "          \"Server_pinot-server-rg0-2.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
-            + "          \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
-            + "        ]\n" + "      }\n" + "  }\n";
+        "{\n"
+      + "  \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+      + "  \"partitionToInstancesMap\": {\n"
+      + "    \"0_0\": [\n"
+      + "      \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+      + "      \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+      + "    ],\n"
+      + "    \"0_1\": [\n"
+      + "      \"Server_pinot-server-rg0-2.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+      + "      \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+      + "    ]\n"
+      + "  }\n"
+      + "}";
+    //@formatter:on
     InstancePartitions existing = OBJECT_MAPPER.readValue(existingPartitionsJson, InstancePartitions.class);
     InstanceReplicaGroupPartitionConfig config =
         new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null);
@@ -150,17 +194,24 @@ public class InstanceReplicaGroupPartitionSelectorTest {
 
     // The "rg0-2" instance is replaced by "rg1-0" (which belongs to Pool 1), as "rg0-2" no longer belongs to Pool 1.
     // And "rg1-0" remains the same position as it's always under Pool 1.
+    //@formatter:off
     String expectedInstancePartitions =
-        "    {\n" + "      \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
-            + "      \"partitionToInstancesMap\": {\n" + "        \"0_0\": [\n"
-            + "          \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
-            + "          \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
-            + "        ],\n" + "        \"0_1\": [\n"
-            + "          \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
-            + "          \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
-            + "        ]\n" + "      }\n" + "  }\n";
+        "{\n"
+      + "  \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+      + "  \"partitionToInstancesMap\": {\n"
+      + "    \"0_0\": [\n"
+      + "      \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+      + "      \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+      + "    ],\n"
+      + "    \"0_1\": [\n"
+      + "      \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+      + "      \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+      + "    ]\n"
+      + "  }\n"
+      + "}";
+    //@formatter:on
     InstancePartitions expectedPartitions =
         OBJECT_MAPPER.readValue(expectedInstancePartitions, InstancePartitions.class);
-    assert assignedPartitions.equals(expectedPartitions);
+    assertEquals(assignedPartitions, expectedPartitions);
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java
index 45645387af..be18d35e50 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java
@@ -30,11 +30,7 @@ public class Pairs {
   }
 
   public static Comparator<IntPair> intPairComparator() {
-    return new AscendingIntPairComparator(true);
-  }
-
-  public static Comparator<IntPair> intPairComparator(boolean ascending) {
-    return new AscendingIntPairComparator(ascending);
+    return new AscendingIntPairComparator();
   }
 
   public static class IntPair {
@@ -83,26 +79,13 @@ public class Pairs {
   }
 
   public static class AscendingIntPairComparator implements Comparator<IntPair> {
-    private boolean _ascending;
-
-    public AscendingIntPairComparator(boolean ascending) {
-      _ascending = ascending;
-    }
 
     @Override
     public int compare(IntPair pair1, IntPair pair2) {
       if (pair1._left != pair2._left) {
-        if (_ascending) {
-          return Integer.compare(pair1._left, pair2._left);
-        } else {
-          return Integer.compare(pair2._left, pair1._left);
-        }
+        return Integer.compare(pair1._left, pair2._left);
       } else {
-        if (_ascending) {
-          return Integer.compare(pair1._right, pair2._right);
-        } else {
-          return Integer.compare(pair2._right, pair1._right);
-        }
+        return Integer.compare(pair1._right, pair2._right);
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


(pinot) 04/05: Add logic to consider the case when instances are moved across pools

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch maintain-pool-selection-for-minimizeDataMovement
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 25b4c3a212b234106a92e30489b03f195d3eb60c
Author: jlli_LinkedIn <jl...@linkedin.com>
AuthorDate: Fri Dec 29 23:26:52 2023 -0800

    Add logic to consider the case when instances are moved across pools
---
 .../InstanceReplicaGroupPartitionSelector.java     | 75 +++++++++++++++++----
 .../instance/InstanceTagPoolSelector.java          | 38 ++++++-----
 .../InstanceReplicaGroupPartitionSelectorTest.java | 76 ++++++++++++++++++++--
 .../java/org/apache/pinot/spi/utils/Pairs.java     | 23 ++++++-
 4 files changed, 171 insertions(+), 41 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
index 79e95db7a6..505006f1d3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
@@ -74,6 +74,9 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
       int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups();
       Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive");
       Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
+      Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>();
+      Map<Integer, Set<Integer>> existingPoolToExistingReplicaGroupIdsMap = new TreeMap<>();
+      Map<Integer, Set<String>> existingReplicaGroupIdToExistingInstancesMap = new TreeMap<>();
       Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
       Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
       Map<String, Integer> instanceToPoolMap = new HashMap<>();
@@ -89,26 +92,70 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
       }
 
       if (_minimizeDataMovement && _existingInstancePartitions != null) {
-        // Keep the same pool for the replica group if it's already been used for the table.
+        // Collect the stats between the existing pools, existing replica groups, and existing instances.
         int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
         int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
-        int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);
-        for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) {
-          boolean foundExistingReplicaGroup = false;
-          for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingReplicaGroup; partitionId++) {
+        for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
+          for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
             List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
             for (String existingInstance : existingInstances) {
               Integer existingPool = instanceToPoolMap.get(existingInstance);
-              if (existingPool != null & pools.contains(existingPool)) {
-                poolToReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new ArrayList<>()).add(replicaGroupId);
-                replicaGroupIdToPoolMap.put(replicaGroupId, existingPool);
-                foundExistingReplicaGroup = true;
-                break;
+              if (existingPool != null) {
+                existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new HashSet<>())
+                    .add(existingInstance);
+                existingPoolToExistingReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new HashSet<>())
+                    .add(replicaGroupId);
+                existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>())
+                    .add(existingInstance);
+              }
+            }
+          }
+        }
+
+        // Use a max heap to track the number of servers used for the given pools,
+        // so that pool with max number of existing instances will be considered first.
+        PriorityQueue<Pairs.IntPair> maxHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator(false));
+        for (int pool : pools) {
+          maxHeap.add(
+              new Pairs.IntPair(existingPoolsToExistingInstancesMap.computeIfAbsent(pool, k -> new HashSet<>()).size(),
+                  pool));
+        }
+
+        // Get the maximum number of replica groups per pool.
+        int maxNumberOfReplicaGroupPerPool = numReplicaGroups / pools.size();
+        // Given a pool number, assign replica group which has the max number of existing instances.
+        // Repeat this process until the max number of replica groups per pool is reached.
+        while (!maxHeap.isEmpty()) {
+          Pairs.IntPair pair = maxHeap.remove();
+          int poolNumber = pair.getRight();
+          for (int i = 0; i < maxNumberOfReplicaGroupPerPool; i++) {
+            Set<Integer> existingReplicaGroups = existingPoolToExistingReplicaGroupIdsMap.get(poolNumber);
+            if (existingReplicaGroups == null || existingReplicaGroups.isEmpty()) {
+              continue;
+            }
+            int targetReplicaGroupId = -1;
+            int maxNumInstances = 0;
+            for (int existingReplicaGroupId : existingReplicaGroups) {
+              int numExistingInstances =
+                  existingReplicaGroupIdToExistingInstancesMap.getOrDefault(existingReplicaGroupId, new HashSet<>())
+                      .size();
+              if (numExistingInstances > maxNumInstances) {
+                maxNumInstances = numExistingInstances;
+                targetReplicaGroupId = existingReplicaGroupId;
               }
             }
+            // If target existing replica group cannot be found, it means it should be chosen from a new replica group.
+            if (targetReplicaGroupId > -1) {
+              poolToReplicaGroupIdsMap.computeIfAbsent(poolNumber, k -> new ArrayList<>()).add(targetReplicaGroupId);
+              replicaGroupIdToPoolMap.put(targetReplicaGroupId, poolNumber);
+              // Clear the stats so that the same replica group won't be picked up again in later iteration.
+              existingReplicaGroupIdToExistingInstancesMap.get(targetReplicaGroupId).clear();
+            }
           }
         }
-        // Use a min heap to track the least frequently picked pool among all the pools
+
+        // If there is any new replica group added, choose pool which is least frequently picked up.
+        // Use a min heap to track the least frequently picked pool among all the pools.
         PriorityQueue<Pairs.IntPair> minHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator());
         for (int pool : pools) {
           int numExistingReplicaGroups =
@@ -190,7 +237,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
         int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
         int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);
 
-        Map<Integer, Set<String>> replicaGroupIdToExistingInstancesMap = new TreeMap<>();
+        existingReplicaGroupIdToExistingInstancesMap = new TreeMap<>();
         // Step 1: find out the replica groups and their existing instances,
         //   so that these instances can be filtered out and won't be chosen for the other replica group.
         for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) {
@@ -202,7 +249,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
 
           for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
             List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
-            replicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>())
+            existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>())
                 .addAll(existingInstances);
           }
         }
@@ -215,7 +262,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
               otherReplicaGroupId < existingNumReplicaGroups && otherReplicaGroupId < numReplicaGroups;
               otherReplicaGroupId++) {
             if (replicaGroupId != otherReplicaGroupId) {
-              candidateInstances.removeAll(replicaGroupIdToExistingInstancesMap.get(otherReplicaGroupId));
+              candidateInstances.removeAll(existingReplicaGroupIdToExistingInstancesMap.get(otherReplicaGroupId));
             }
           }
           LinkedHashSet<String> chosenCandidateInstances = new LinkedHashSet<>();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
index 2062a75209..940968432b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
@@ -22,16 +22,18 @@ import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import javax.annotation.Nullable;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.utils.config.InstanceUtils;
 import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
+import org.apache.pinot.spi.utils.Pairs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,36 +125,38 @@ public class InstanceTagPoolSelector {
 
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
         if (_minimizeDataMovement && _existingInstancePartitions != null) {
-          Set<Integer> existingPools = new TreeSet<>();
+          Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>();
           // Keep the same pool if it's already been used for the table.
           int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
           int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
           for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
-            boolean foundExistingPoolForReplicaGroup = false;
-            for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingPoolForReplicaGroup;
-                partitionId++) {
+            for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
               List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
               for (String existingInstance : existingInstances) {
                 Integer existingPool = instanceToPoolMap.get(existingInstance);
                 if (existingPool != null) {
-                  if (existingPools.add(existingPool)) {
-                    poolsToSelect.add(existingPool);
+                  if (!existingPoolsToExistingInstancesMap.containsKey(existingPool)) {
+                    existingPoolsToExistingInstancesMap.put(existingPool, new HashSet<>());
                   }
-                  foundExistingPoolForReplicaGroup = true;
-                  break;
+                  existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new HashSet<>())
+                      .add(existingInstance);
                 }
               }
             }
           }
-          LOGGER.info("Keep the same pool: {} for table: {}", existingPools, _tableNameWithType);
-          // Pick a pool from remainingPools that isn't used before.
-          List<Integer> remainingPools = new ArrayList<>(pools);
-          remainingPools.removeAll(existingPools);
-          // Select from the remaining pools.
-          int remainingNumPoolsToSelect = numPoolsToSelect - poolsToSelect.size();
-          for (int i = 0; i < remainingNumPoolsToSelect; i++) {
-            poolsToSelect.add(remainingPools.remove(i % remainingPools.size()));
+
+          // Use a max heap to track the number of servers used for all the pools.
+          PriorityQueue<Pairs.IntPair> maxHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator(false));
+          for (int pool : pools) {
+            maxHeap.add(new Pairs.IntPair(existingPoolsToExistingInstancesMap.get(pool).size(), pool));
+          }
+
+          // Pick the pools from the max heap, so that data movement be minimized.
+          for (int i = 0; i < numPoolsToSelect; i++) {
+            Pairs.IntPair pair = maxHeap.remove();
+            poolsToSelect.add(pair.getRight());
           }
+          LOGGER.info("The selected pools: " + poolsToSelect);
         } else {
           // Select pools based on the table name hash to evenly distribute the tables
           List<Integer> poolsInCluster = new ArrayList<>(pools);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
index 2fdef27796..fdb6292f26 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
@@ -34,6 +34,8 @@ import org.testng.annotations.Test;
 
 public class InstanceReplicaGroupPartitionSelectorTest {
 
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
   private static final String INSTANCE_CONFIG_TEMPLATE =
       "{\n" + "  \"id\": \"Server_pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
           + "  \"simpleFields\": {\n" + "    \"HELIX_ENABLED\": \"true\",\n"
@@ -51,15 +53,15 @@ public class InstanceReplicaGroupPartitionSelectorTest {
           + "    ]\n" + "  }\n" + "}";
 
   @Test
-  public void testSelectInstances() throws JsonProcessingException {
-    ObjectMapper objectMapper = new ObjectMapper();
+  public void testPoolsWhenOneMorePoolAddedAndOneMoreReplicaGroupsNeeded()
+      throws JsonProcessingException {
     String existingPartitionsJson =
         "    {\n" + "      \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
             + "      \"partitionToInstancesMap\": {\n" + "        \"0_0\": [\n"
             + "          \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
             + "          \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
             + "        ]\n" + "      }\n" + "    }\n";
-    InstancePartitions existing = objectMapper.readValue(existingPartitionsJson, InstancePartitions.class);
+    InstancePartitions existing = OBJECT_MAPPER.readValue(existingPartitionsJson, InstancePartitions.class);
     InstanceReplicaGroupPartitionConfig config =
         new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null);
 
@@ -68,8 +70,10 @@ public class InstanceReplicaGroupPartitionSelectorTest {
 
     String[] serverNames = {"rg0-0", "rg0-1", "rg1-0", "rg1-1"};
     String[] poolNumbers = {"0", "0", "1", "1"};
-    String[] poolNames = {"FirstHalfReplicationGroups", "FirstHalfReplicationGroups", "SecondHalfReplicationGroups",
-        "SecondHalfReplicationGroups"};
+    String[] poolNames = {
+        "FirstHalfReplicationGroups", "FirstHalfReplicationGroups", "SecondHalfReplicationGroups",
+        "SecondHalfReplicationGroups"
+    };
     Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new HashMap<>();
 
     for (int i = 0; i < serverNames.length; i++) {
@@ -81,13 +85,15 @@ public class InstanceReplicaGroupPartitionSelectorTest {
       StringSubstitutor substitutor = new StringSubstitutor(valuesMap);
       String resolvedString = substitutor.replace(INSTANCE_CONFIG_TEMPLATE);
 
-      ZNRecord znRecord = objectMapper.readValue(resolvedString, ZNRecord.class);
+      ZNRecord znRecord = OBJECT_MAPPER.readValue(resolvedString, ZNRecord.class);
       int poolNumber = Integer.parseInt(poolNumbers[i]);
       poolToInstanceConfigsMap.computeIfAbsent(poolNumber, k -> new ArrayList<>()).add(new InstanceConfig(znRecord));
     }
     InstancePartitions assignedPartitions = new InstancePartitions("0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE");
     selector.selectInstances(poolToInstanceConfigsMap, assignedPartitions);
 
+    // Now that 1 more pool is added and 1 more RG is needed, a new set called "0_1" is generated,
+    // and the instances from Pool 1 are assigned to this new replica.
     String expectedInstancePartitions =
         "    {\n" + "      \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
             + "      \"partitionToInstancesMap\": {\n" + "        \"0_0\": [\n"
@@ -98,7 +104,63 @@ public class InstanceReplicaGroupPartitionSelectorTest {
             + "          \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
             + "        ]\n" + "      }\n" + "  }\n";
     InstancePartitions expectedPartitions =
-        objectMapper.readValue(expectedInstancePartitions, InstancePartitions.class);
+        OBJECT_MAPPER.readValue(expectedInstancePartitions, InstancePartitions.class);
+    assert assignedPartitions.equals(expectedPartitions);
+  }
+
+  @Test
+  public void testSelectPoolsWhenExistingReplicaGroupMapsToMultiplePools()
+      throws JsonProcessingException {
+    // The "rg0-2" instance used to belong to Pool 1, but now it belongs to Pool 0.
+    String existingPartitionsJson =
+        "    {\n" + "      \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+            + "      \"partitionToInstancesMap\": {\n" + "        \"0_0\": [\n"
+            + "          \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+            + "          \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+            + "        ],\n" + "        \"0_1\": [\n"
+            + "          \"Server_pinot-server-rg0-2.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+            + "          \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+            + "        ]\n" + "      }\n" + "  }\n";
+    InstancePartitions existing = OBJECT_MAPPER.readValue(existingPartitionsJson, InstancePartitions.class);
+    InstanceReplicaGroupPartitionConfig config =
+        new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null);
+
+    InstanceReplicaGroupPartitionSelector selector =
+        new InstanceReplicaGroupPartitionSelector(config, "tableNameBlah", existing, true);
+
+    String[] serverNames = {"rg0-0", "rg0-1", "rg0-2", "rg1-0", "rg1-1", "rg1-2"};
+    String[] poolNumbers = {"0", "0", "0", "1", "1", "1"};
+    Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new HashMap<>();
+
+    for (int i = 0; i < serverNames.length; i++) {
+      Map<String, String> valuesMap = new HashMap<>();
+      valuesMap.put("serverName", serverNames[i]);
+      valuesMap.put("pool", poolNumbers[i]);
+
+      StringSubstitutor substitutor = new StringSubstitutor(valuesMap);
+      String resolvedString = substitutor.replace(INSTANCE_CONFIG_TEMPLATE);
+
+      ZNRecord znRecord = OBJECT_MAPPER.readValue(resolvedString, ZNRecord.class);
+      int poolNumber = Integer.parseInt(poolNumbers[i]);
+      poolToInstanceConfigsMap.computeIfAbsent(poolNumber, k -> new ArrayList<>()).add(new InstanceConfig(znRecord));
+    }
+
+    InstancePartitions assignedPartitions = new InstancePartitions("0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE");
+    selector.selectInstances(poolToInstanceConfigsMap, assignedPartitions);
+
+    // The "rg0-2" instance is replaced by "rg1-0" (which belongs to Pool 1), as "rg0-2" no longer belongs to Pool 1.
+    // And "rg1-0" remains the same position as it's always under Pool 1.
+    String expectedInstancePartitions =
+        "    {\n" + "      \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+            + "      \"partitionToInstancesMap\": {\n" + "        \"0_0\": [\n"
+            + "          \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+            + "          \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+            + "        ],\n" + "        \"0_1\": [\n"
+            + "          \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+            + "          \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+            + "        ]\n" + "      }\n" + "  }\n";
+    InstancePartitions expectedPartitions =
+        OBJECT_MAPPER.readValue(expectedInstancePartitions, InstancePartitions.class);
     assert assignedPartitions.equals(expectedPartitions);
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java
index be18d35e50..45645387af 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java
@@ -30,7 +30,11 @@ public class Pairs {
   }
 
   public static Comparator<IntPair> intPairComparator() {
-    return new AscendingIntPairComparator();
+    return new AscendingIntPairComparator(true);
+  }
+
+  public static Comparator<IntPair> intPairComparator(boolean ascending) {
+    return new AscendingIntPairComparator(ascending);
   }
 
   public static class IntPair {
@@ -79,13 +83,26 @@ public class Pairs {
   }
 
   public static class AscendingIntPairComparator implements Comparator<IntPair> {
+    private boolean _ascending;
+
+    public AscendingIntPairComparator(boolean ascending) {
+      _ascending = ascending;
+    }
 
     @Override
     public int compare(IntPair pair1, IntPair pair2) {
       if (pair1._left != pair2._left) {
-        return Integer.compare(pair1._left, pair2._left);
+        if (_ascending) {
+          return Integer.compare(pair1._left, pair2._left);
+        } else {
+          return Integer.compare(pair2._left, pair1._left);
+        }
       } else {
-        return Integer.compare(pair1._right, pair2._right);
+        if (_ascending) {
+          return Integer.compare(pair1._right, pair2._right);
+        } else {
+          return Integer.compare(pair2._right, pair1._right);
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


(pinot) 02/05: Update pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch maintain-pool-selection-for-minimizeDataMovement
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 97ab0b7d4599fba3b85129982ed7132eeaf9ecf0
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Tue Nov 14 21:59:59 2023 -0800

    Update pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
    
    Co-authored-by: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
---
 .../helix/core/assignment/instance/InstancePartitionSelector.java     | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
index 5f92db2426..335070b003 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
@@ -36,7 +36,9 @@ abstract class InstancePartitionSelector {
     _replicaGroupPartitionConfig = replicaGroupPartitionConfig;
     _tableNameWithType = tableNameWithType;
     _existingInstancePartitions = existingInstancePartitions;
-    _minimizeDataMovement = minimizeDataMovement;
+    // For backward compatibility, enable minimize data movement when it is enabled in top level or instance
+    // partition selector level.
+    _minimizeDataMovement = minimizeDataMovement || replicaGroupPartitionConfig.isMinimizeDataMovement();
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org