You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/02 23:52:18 UTC

[incubator-pinot] 01/08: Controller side changes pseudo code

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 54ce986d49eb418a949c04d7d9832f81e4453d65
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Wed Dec 23 17:08:08 2020 -0800

    Controller side changes pseudo code
---
 .../helix/core/PinotHelixResourceManager.java      | 29 ++++++++
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 79 ++++++++++++++++++++++
 .../org/apache/pinot/spi/stream/Checkpoint.java    | 24 +++++++
 .../org/apache/pinot/spi/stream/FetchResult.java   | 27 ++++++++
 .../pinot/spi/stream/PartitionGroupConsumer.java   | 23 +++++++
 .../pinot/spi/stream/PartitionGroupMetadata.java   | 41 +++++++++++
 .../spi/stream/PartitionGroupMetadataList.java     | 30 ++++++++
 .../org/apache/pinot/spi/stream/StreamConfig.java  |  6 +-
 .../pinot/spi/stream/StreamConsumerFactory.java    |  9 ++-
 9 files changed, 266 insertions(+), 2 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 6b5168f..5afbe7e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -85,6 +85,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.CommonConstants.Helix;
 import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
 import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
@@ -122,7 +123,10 @@ import org.apache.pinot.spi.config.table.TenantConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.config.tenant.Tenant;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -1332,6 +1336,10 @@ public class PinotHelixResourceManager {
         IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig));
     IdealState idealState = getTableIdealState(realtimeTableName);
 
+    if (streamConfig.isShardedConsumerType()) {
+      setupShardedRealtimeTable(streamConfig, idealState, realtimeTableConfig.getValidationConfig().getReplicasPerPartitionNumber());
+    }
+
     if (streamConfig.hasHighLevelConsumerType()) {
       if (idealState == null) {
         LOGGER.info("Initializing IdealState for HLC table: {}", realtimeTableName);
@@ -1364,6 +1372,27 @@ public class PinotHelixResourceManager {
     }
   }
 
+  /**
+   * Sets up the realtime table ideal state
+   * @param streamConfig
+   */
+  private void setupShardedRealtimeTable(StreamConfig streamConfig, IdealState idealState, int numReplicas) {
+    StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+
+    // get current partition groups and their metadata - this will be empty when creating the table
+    List<PartitionGroupMetadata> currentPartitionGroupMetadataList = _pinotLLCRealtimeSegmentManager.getCurrentPartitionGroupMetadataList(idealState);
+
+    // get new partition groups and their metadata,
+    // Assume table has 3 shards. Say we get [0], [1], [2] groups (for now assume that each group contains only 1 shard)
+    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+        streamConsumerFactory.getPartitionGroupMetadataList(currentPartitionGroupMetadataList);
+
+    // setup segment zk metadata and ideal state for all the new found partition groups
+    _pinotLLCRealtimeSegmentManager.setupNewPartitionGroups(newPartitionGroupMetadataList, numReplicas);
+  }
+
+
+
   private void ensurePropertyStoreEntryExistsForHighLevelConsumer(String realtimeTableName) {
     String propertyStorePath = ZKMetadataProvider.constructPropertyStorePathForResource(realtimeTableName);
     if (!_propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 00433b3..b459760 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -45,6 +45,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
@@ -75,10 +76,12 @@ import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 import org.apache.pinot.spi.stream.PartitionOffsetFetcher;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
@@ -157,6 +160,82 @@ public class PinotLLCRealtimeSegmentManager {
     _flushThresholdUpdateManager = new FlushThresholdUpdateManager();
   }
 
+  /**
+   * The committing segment will call this.
+   * 
+   * For example, say we have 3 shards, grouped into PartitionGroups as [0], [1], [2]
+   * Now segment of PG (partition group) 0 is committing. First, we'll update the metadata to DONE, and ideal state to ONLINE
+   * Then, the currentPartitionGroupMetadata list will contain - [1], [2]
+   * The newPartitionGroupMetadata list will contain - [0], [1], [2]
+   * We then get the set of PGs for which new segments need to be made - [0]
+   */
+  public void commitPartitionGroup(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) {
+    TableConfig realtimeTableConfig = getTableConfig(realtimeTableName);
+    StreamConfig streamConfig = new StreamConfig(realtimeTableName, IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig));
+    int numReplicas = realtimeTableConfig.getValidationConfig().getReplicasPerPartitionNumber();
+    IdealState idealState = getIdealState(realtimeTableName);
+
+    // update status in segment metadata to DONE
+    // ..
+
+    // update Ideal State for this segment to ONLINE
+    // ..
+
+    // fetch current partition groups (which are actively CONSUMING - from example above, [1], [2])
+    List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
+
+    // get new partition groups (honor any groupings which are already consuming - [0], [1], [2])
+    StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+        streamConsumerFactory.getPartitionGroupMetadataList(currentPartitionGroupMetadataList);
+
+    // from the above list, remove the partition groups which are already CONSUMING
+    // i.e. newPartitionGroups - currentPartitionGroups. Therefore, ([0], [1], [2]) - ([1], [2]) = ([0])
+    // ..
+
+    // setup segment metadata and ideal state for the new found  partition groups
+    setupNewPartitionGroups(newPartitionGroupMetadataList, numReplicas);
+  }
+
+  public void setupIdealStateForConsuming(List<SegmentZKMetadata> segmentZKMetadata, int numReplicas) {
+    // add all segments from the list to ideal state, with state CONSUMING
+  }
+
+  public void persistSegmentMetadata(List<SegmentZKMetadata> segmentMetadata) {
+    // persist new segment metadata from list to zk
+  }
+
+  /**
+   * Using the list of partition group metadata, create a list of equivalent segment zk metadata
+   */
+  public List<SegmentZKMetadata> constructSegmentMetadata(List<PartitionGroupMetadata> partitionGroupMetadataList) {
+    List<SegmentZKMetadata> segmentZKMetadata = new ArrayList<>();
+    // for each partition group construct a segment zk metadata object
+    return segmentZKMetadata;
+  }
+
+  /**
+   * Using the ideal state, return a list of the current partition groups
+   */
+  public List<PartitionGroupMetadata> getCurrentPartitionGroupMetadataList(IdealState idealState) {
+    List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
+    // from all segment names in the ideal state, find unique groups
+
+    // create a PartitionGroupMetadata, one for each group
+    return partitionGroupMetadataList;
+  }
+
+  public void setupNewPartitionGroups(List<PartitionGroupMetadata> newPartitionGroupMetadataList, int numReplicas) {
+    // construct segment zk metadata for the new partition groups
+    List<SegmentZKMetadata> segmentMetadata = constructSegmentMetadata(newPartitionGroupMetadataList);
+
+    // create these new segments metadata
+    persistSegmentMetadata(segmentMetadata);
+
+    // setup ideal state for the new segments
+    setupIdealStateForConsuming(segmentMetadata, numReplicas);
+  }
+
   public boolean getIsSplitCommitEnabled() {
     return _controllerConf.getAcceptSplitCommit();
   }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java
new file mode 100644
index 0000000..627c964
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+public interface Checkpoint {
+  byte[] serialize();
+  Checkpoint deserialize(byte[] blob);
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java
new file mode 100644
index 0000000..b0ed6e5
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import java.util.List;
+
+
+public interface FetchResult<T> {
+  Checkpoint getLastCheckpoint();
+  List<T> getMessages();
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
new file mode 100644
index 0000000..2f138c2
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+public interface PartitionGroupConsumer {
+  FetchResult fetch(Checkpoint start, Checkpoint end, long timeout);
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
new file mode 100644
index 0000000..779c167
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import java.util.List;
+
+
+public interface PartitionGroupMetadata {
+
+  int getGroupId();
+
+  List<String> getPartitions();
+
+  Checkpoint getStartCheckpoint(); // similar to getStartOffset
+
+  Checkpoint getEndCheckpoint(); // similar to getEndOffset
+
+  void setStartCheckpoint(Checkpoint startCheckpoint);
+
+  void setEndCheckpoint(Checkpoint endCheckpoint);
+
+  byte[] serialize();
+
+  PartitionGroupMetadata deserialize(byte[] blob);
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataList.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataList.java
new file mode 100644
index 0000000..1568d63
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataList.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import java.util.List;
+
+
+public interface PartitionGroupMetadataList {
+
+  List<PartitionGroupMetadata> getMetadataList();
+
+  PartitionGroupMetadata getPartitionGroupMetadata(int index);
+
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index d343203..a3e359e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -41,7 +41,7 @@ public class StreamConfig {
    * The type of the stream consumer either HIGHLEVEL or LOWLEVEL. For backward compatibility, adding SIMPLE which is equivalent to LOWLEVEL
    */
   public enum ConsumerType {
-    HIGHLEVEL, LOWLEVEL
+    HIGHLEVEL, LOWLEVEL, SHARDED
   }
 
   public static final int DEFAULT_FLUSH_THRESHOLD_ROWS = 5_000_000;
@@ -273,6 +273,10 @@ public class StreamConfig {
     return _consumerTypes.contains(ConsumerType.LOWLEVEL);
   }
 
+  public boolean isShardedConsumerType() {
+    return _consumerTypes.size() == 1 && _consumerTypes.get(0).equals(ConsumerType.SHARDED);
+  }
+
   public String getConsumerFactoryClassName() {
     return _consumerFactoryClassName;
   }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index 27205c9..4db0fb1 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pinot.spi.stream;
 
+import java.util.List;
 import java.util.Set;
-import org.apache.pinot.spi.data.Schema;
 
 
 /**
@@ -73,4 +73,11 @@ public abstract class StreamConsumerFactory {
   public StreamPartitionMsgOffsetFactory createStreamMsgOffsetFactory() {
     return new LongMsgOffsetFactory();
   }
+
+  // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state
+  public abstract List<PartitionGroupMetadata> getPartitionGroupMetadataList(
+      List<PartitionGroupMetadata> currentPartitionGroupsMetadata);
+
+  // creates a consumer which consumes from a partition group
+  public abstract PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org