You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/02 23:52:18 UTC
[incubator-pinot] 01/08: Controller side changes pseudo code
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 54ce986d49eb418a949c04d7d9832f81e4453d65
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Wed Dec 23 17:08:08 2020 -0800
Controller side changes pseudo code
---
.../helix/core/PinotHelixResourceManager.java | 29 ++++++++
.../realtime/PinotLLCRealtimeSegmentManager.java | 79 ++++++++++++++++++++++
.../org/apache/pinot/spi/stream/Checkpoint.java | 24 +++++++
.../org/apache/pinot/spi/stream/FetchResult.java | 27 ++++++++
.../pinot/spi/stream/PartitionGroupConsumer.java | 23 +++++++
.../pinot/spi/stream/PartitionGroupMetadata.java | 41 +++++++++++
.../spi/stream/PartitionGroupMetadataList.java | 30 ++++++++
.../org/apache/pinot/spi/stream/StreamConfig.java | 6 +-
.../pinot/spi/stream/StreamConsumerFactory.java | 9 ++-
9 files changed, 266 insertions(+), 2 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 6b5168f..5afbe7e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -85,6 +85,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.CommonConstants.Helix;
import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
@@ -122,7 +123,10 @@ import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -1332,6 +1336,10 @@ public class PinotHelixResourceManager {
IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig));
IdealState idealState = getTableIdealState(realtimeTableName);
+ if (streamConfig.isShardedConsumerType()) {
+ setupShardedRealtimeTable(streamConfig, idealState, realtimeTableConfig.getValidationConfig().getReplicasPerPartitionNumber());
+ }
+
if (streamConfig.hasHighLevelConsumerType()) {
if (idealState == null) {
LOGGER.info("Initializing IdealState for HLC table: {}", realtimeTableName);
@@ -1364,6 +1372,27 @@ public class PinotHelixResourceManager {
}
}
+ /**
+ * Sets up the realtime table ideal state
+ * @param streamConfig
+ */
+ private void setupShardedRealtimeTable(StreamConfig streamConfig, IdealState idealState, int numReplicas) {
+ StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+
+ // get current partition groups and their metadata - this will be empty when creating the table
+ List<PartitionGroupMetadata> currentPartitionGroupMetadataList = _pinotLLCRealtimeSegmentManager.getCurrentPartitionGroupMetadataList(idealState);
+
+ // get new partition groups and their metadata,
+ // Assume table has 3 shards. Say we get [0], [1], [2] groups (for now assume that each group contains only 1 shard)
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+ streamConsumerFactory.getPartitionGroupMetadataList(currentPartitionGroupMetadataList);
+
+ // setup segment zk metadata and ideal state for all the new found partition groups
+ _pinotLLCRealtimeSegmentManager.setupNewPartitionGroups(newPartitionGroupMetadataList, numReplicas);
+ }
+
+
+
private void ensurePropertyStoreEntryExistsForHighLevelConsumer(String realtimeTableName) {
String propertyStorePath = ZKMetadataProvider.constructPropertyStorePathForResource(realtimeTableName);
if (!_propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 00433b3..b459760 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -45,6 +45,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
@@ -75,10 +76,12 @@ import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.PartitionOffsetFetcher;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
@@ -157,6 +160,82 @@ public class PinotLLCRealtimeSegmentManager {
_flushThresholdUpdateManager = new FlushThresholdUpdateManager();
}
+ /**
+ * The committing segment will call this.
+ *
+ * For example, say we have 3 shards, grouped into PartitionGroups as [0], [1], [2]
+ * Now segment of PG (partition group) 0 is committing. First, we'll update the metadata to DONE, and ideal state to ONLINE
+ * Then, the currentPartitionGroupMetadata list will contain - [1], [2]
+ * The newPartitionGroupMetadata list will contain - [0], [1], [2]
+ * We then get the set of PGs for which new segments need to be made - [0]
+ */
+ public void commitPartitionGroup(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) {
+ TableConfig realtimeTableConfig = getTableConfig(realtimeTableName);
+ StreamConfig streamConfig = new StreamConfig(realtimeTableName, IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig));
+ int numReplicas = realtimeTableConfig.getValidationConfig().getReplicasPerPartitionNumber();
+ IdealState idealState = getIdealState(realtimeTableName);
+
+ // update status in segment metadata to DONE
+ // ..
+
+ // update Ideal State for this segment to ONLINE
+ // ..
+
+ // fetch current partition groups (which are actively CONSUMING - from example above, [1], [2])
+ List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
+
+ // get new partition groups (honor any groupings which are already consuming - [0], [1], [2])
+ StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+ streamConsumerFactory.getPartitionGroupMetadataList(currentPartitionGroupMetadataList);
+
+ // from the above list, remove the partition groups which are already CONSUMING
+ // i.e. newPartitionGroups - currentPartitionGroups. Therefore, ([0], [1], [2]) - ([1], [2]) = ([0])
+ // ..
+
+ // setup segment metadata and ideal state for the new found partition groups
+ setupNewPartitionGroups(newPartitionGroupMetadataList, numReplicas);
+ }
+
+ public void setupIdealStateForConsuming(List<SegmentZKMetadata> segmentZKMetadata, int numReplicas) {
+ // add all segments from the list to ideal state, with state CONSUMING
+ }
+
+ public void persistSegmentMetadata(List<SegmentZKMetadata> segmentMetadata) {
+ // persist new segment metadata from list to zk
+ }
+
+ /**
+ * Using the list of partition group metadata, create a list of equivalent segment zk metadata
+ */
+ public List<SegmentZKMetadata> constructSegmentMetadata(List<PartitionGroupMetadata> partitionGroupMetadataList) {
+ List<SegmentZKMetadata> segmentZKMetadata = new ArrayList<>();
+ // for each partition group construct a segment zk metadata object
+ return segmentZKMetadata;
+ }
+
+ /**
+ * Using the ideal state, return a list of the current partition groups
+ */
+ public List<PartitionGroupMetadata> getCurrentPartitionGroupMetadataList(IdealState idealState) {
+ List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
+ // from all segment names in the ideal state, find unique groups
+
+ // create a PartitionGroupMetadata, one for each group
+ return partitionGroupMetadataList;
+ }
+
+ public void setupNewPartitionGroups(List<PartitionGroupMetadata> newPartitionGroupMetadataList, int numReplicas) {
+ // construct segment zk metadata for the new partition groups
+ List<SegmentZKMetadata> segmentMetadata = constructSegmentMetadata(newPartitionGroupMetadataList);
+
+ // create these new segments metadata
+ persistSegmentMetadata(segmentMetadata);
+
+ // setup ideal state for the new segments
+ setupIdealStateForConsuming(segmentMetadata, numReplicas);
+ }
+
public boolean getIsSplitCommitEnabled() {
return _controllerConf.getAcceptSplitCommit();
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java
new file mode 100644
index 0000000..627c964
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+public interface Checkpoint {
+ byte[] serialize();
+ Checkpoint deserialize(byte[] blob);
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java
new file mode 100644
index 0000000..b0ed6e5
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import java.util.List;
+
+
+public interface FetchResult<T> {
+ Checkpoint getLastCheckpoint();
+ List<T> getMessages();
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
new file mode 100644
index 0000000..2f138c2
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+public interface PartitionGroupConsumer {
+ FetchResult fetch(Checkpoint start, Checkpoint end, long timeout);
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
new file mode 100644
index 0000000..779c167
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import java.util.List;
+
+
+public interface PartitionGroupMetadata {
+
+ int getGroupId();
+
+ List<String> getPartitions();
+
+ Checkpoint getStartCheckpoint(); // similar to getStartOffset
+
+ Checkpoint getEndCheckpoint(); // similar to getEndOffset
+
+ void setStartCheckpoint(Checkpoint startCheckpoint);
+
+ void setEndCheckpoint(Checkpoint endCheckpoint);
+
+ byte[] serialize();
+
+ PartitionGroupMetadata deserialize(byte[] blob);
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataList.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataList.java
new file mode 100644
index 0000000..1568d63
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataList.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import java.util.List;
+
+
+public interface PartitionGroupMetadataList {
+
+ List<PartitionGroupMetadata> getMetadataList();
+
+ PartitionGroupMetadata getPartitionGroupMetadata(int index);
+
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index d343203..a3e359e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -41,7 +41,7 @@ public class StreamConfig {
* The type of the stream consumer either HIGHLEVEL or LOWLEVEL. For backward compatibility, adding SIMPLE which is equivalent to LOWLEVEL
*/
public enum ConsumerType {
- HIGHLEVEL, LOWLEVEL
+ HIGHLEVEL, LOWLEVEL, SHARDED
}
public static final int DEFAULT_FLUSH_THRESHOLD_ROWS = 5_000_000;
@@ -273,6 +273,10 @@ public class StreamConfig {
return _consumerTypes.contains(ConsumerType.LOWLEVEL);
}
+ public boolean isShardedConsumerType() {
+ return _consumerTypes.size() == 1 && _consumerTypes.get(0).equals(ConsumerType.SHARDED);
+ }
+
public String getConsumerFactoryClassName() {
return _consumerFactoryClassName;
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index 27205c9..4db0fb1 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -18,8 +18,8 @@
*/
package org.apache.pinot.spi.stream;
+import java.util.List;
import java.util.Set;
-import org.apache.pinot.spi.data.Schema;
/**
@@ -73,4 +73,11 @@ public abstract class StreamConsumerFactory {
public StreamPartitionMsgOffsetFactory createStreamMsgOffsetFactory() {
return new LongMsgOffsetFactory();
}
+
+ // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state
+ public abstract List<PartitionGroupMetadata> getPartitionGroupMetadataList(
+ List<PartitionGroupMetadata> currentPartitionGroupsMetadata);
+
+ // creates a consumer which consumes from a partition group
+ public abstract PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org