You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "Jackie-Jiang (via GitHub)" <gi...@apache.org> on 2023/06/01 20:28:43 UTC

[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10463: Segment compaction for upsert real-time tables

Jackie-Jiang commented on code in PR #10463:
URL: https://github.com/apache/pinot/pull/10463#discussion_r1213629542


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java:
##########
@@ -39,14 +39,30 @@ private SegmentUtils() {
   @Nullable
   public static Integer getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName,
       HelixManager helixManager, @Nullable String partitionColumn) {
+    return getRealtimeSegmentPartitionId(
+        segmentName, realtimeTableName, helixManager, partitionColumn, null);
+  }
+
+  @Nullable
+  public static Integer getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName,
+      SegmentZKMetadata segmentZKMetadata) {
+    return getRealtimeSegmentPartitionId(segmentName, realtimeTableName, null, null, segmentZKMetadata);

Review Comment:
   This method should contain code from line 52-67 in the original code. Change the original method to keep line 42-51 and then call this new method



##########
pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java:
##########
@@ -50,4 +52,15 @@ public MinionStarter(PinotConfiguration config)
       throws Exception {
     init(config);
   }
+
+  public static void main(String[] args)

Review Comment:
   Remove this



##########
pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java:
##########
@@ -255,6 +255,9 @@ public void start()
     updateInstanceConfigIfNeeded();
     minionContext.setHelixPropertyStore(_helixManager.getHelixPropertyStore());
 
+    minionContext.setClusterManagementTool(_helixManager.getClusterManagmentTool());

Review Comment:
   Let's directly put `_helixManager` into the `MinionContext`



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java:
##########
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.annotation.Nullable;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.Response;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
+import org.apache.pinot.common.utils.config.InstanceUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
+import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class UpsertCompactionTaskExecutor extends BaseSingleSegmentConversionExecutor {
+  private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactionTaskExecutor.class);
+  private int _numRecordsCompacted;
+
+  private class CompactedRecordReader implements RecordReader {
+    private final PinotSegmentRecordReader _pinotSegmentRecordReader;
+    private final ImmutableRoaringBitmap _validDocIds;
+    private int _docId = 0;
+    // Reusable generic row to store the next row to return
+    GenericRow _nextRow = new GenericRow();
+    // Flag to mark whether we need to fetch another row
+    boolean _nextRowReturned = true;
+    // Flag to mark whether all records have been iterated
+    boolean _finished = false;
+
+    CompactedRecordReader(File indexDir, ImmutableRoaringBitmap validDocIds) {
+      _pinotSegmentRecordReader = new PinotSegmentRecordReader();
+      _pinotSegmentRecordReader.init(indexDir, null, null);
+      _validDocIds = validDocIds;
+    }
+
+    @Override
+    public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) {
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (_finished) {
+        return false;
+      }
+
+      // If next row has not been returned, return true
+      if (!_nextRowReturned) {
+        return true;
+      }
+
+      // Try to get the next row to return
+      while (_pinotSegmentRecordReader.hasNext()) {

Review Comment:
   We can loop over the valid doc ids, and use `PinotSegmentRecordReader.getRecord(docId, _nextRow)` to read the record



##########
pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java:
##########
@@ -136,4 +136,13 @@ public static class SegmentGenerationAndPushTask {
     public static final String CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE =
         "SegmentGenerationAndPushTask.numConcurrentTasksPerInstance";
   }
+
+  public static class UpsertCompactionTask extends MergeTask {

Review Comment:
   Since we cannot do merge, this should no longer extend the `MergeTask`



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java:
##########
@@ -77,7 +77,7 @@ public static HelixServerStarter startDefault()
     Map<String, Object> properties = new HashMap<>();
     int port = 8003;
     properties.put(Helix.CONFIG_OF_CLUSTER_NAME, "quickstart");
-    properties.put(Helix.CONFIG_OF_ZOOKEEPR_SERVER, "localhost:2191");
+    properties.put(Helix.CONFIG_OF_ZOOKEEPR_SERVER, "localhost:2122");

Review Comment:
   Let's not change the default port



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java:
##########
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.annotation.Nullable;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.Response;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
+import org.apache.pinot.common.utils.config.InstanceUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
+import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class UpsertCompactionTaskExecutor extends BaseSingleSegmentConversionExecutor {
+  private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactionTaskExecutor.class);
+  private int _numRecordsCompacted;
+
+  private class CompactedRecordReader implements RecordReader {
+    private final PinotSegmentRecordReader _pinotSegmentRecordReader;
+    private final ImmutableRoaringBitmap _validDocIds;
+    private int _docId = 0;
+    // Reusable generic row to store the next row to return
+    GenericRow _nextRow = new GenericRow();
+    // Flag to mark whether we need to fetch another row
+    boolean _nextRowReturned = true;
+    // Flag to mark whether all records have been iterated
+    boolean _finished = false;
+
+    CompactedRecordReader(File indexDir, ImmutableRoaringBitmap validDocIds) {
+      _pinotSegmentRecordReader = new PinotSegmentRecordReader();
+      _pinotSegmentRecordReader.init(indexDir, null, null);
+      _validDocIds = validDocIds;
+    }
+
+    @Override
+    public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) {
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (_finished) {
+        return false;
+      }
+
+      // If next row has not been returned, return true
+      if (!_nextRowReturned) {
+        return true;
+      }
+
+      // Try to get the next row to return
+      while (_pinotSegmentRecordReader.hasNext()) {
+        _nextRow.clear();
+        _nextRow = _pinotSegmentRecordReader.next(_nextRow);
+        _docId++;
+        if (_validDocIds.contains(_docId - 1)) {
+          _nextRowReturned = false;
+          return true;
+        } else {
+          _numRecordsCompacted++;
+        }
+      }
+
+      // Cannot find next row to return, return false
+      _finished = true;
+      return false;
+    }
+
+    @Override
+    public GenericRow next() {
+      return next(new GenericRow());
+    }
+
+    @Override
+    public GenericRow next(GenericRow reuse) {
+      Preconditions.checkState(!_nextRowReturned);
+      reuse.init(_nextRow);
+      _nextRowReturned = true;
+      return reuse;
+    }
+
+    @Override
+    public void rewind() {
+      _pinotSegmentRecordReader.rewind();
+      _nextRowReturned = true;
+      _finished = false;
+      _docId = 0;
+      _numRecordsCompacted = 0;
+    }
+
+    @Override
+    public void close()
+      throws IOException {
+      _pinotSegmentRecordReader.close();
+    }
+  }
+
+  @Override
+  protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File indexDir, File workingDir)
+    throws Exception {
+    _eventObserver.notifyProgress(pinotTaskConfig, "Compacting segment: " + indexDir);
+    Map<String, String> configs = pinotTaskConfig.getConfigs();
+    String taskType = pinotTaskConfig.getTaskType();
+    LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+    long startMillis = System.currentTimeMillis();
+
+    String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
+    TableConfig tableConfig = getTableConfig(tableNameWithType);
+    ImmutableRoaringBitmap validDocIds = getValidDocIds(tableNameWithType, configs);
+
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+    String segmentName = segmentMetadata.getName();
+    try (CompactedRecordReader compactedRecordReader = new CompactedRecordReader(indexDir, validDocIds)) {
+      SegmentGeneratorConfig config = getSegmentGeneratorConfig(workingDir, tableConfig, segmentMetadata, segmentName);
+      SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+      driver.init(config, compactedRecordReader);
+      driver.build();
+      _eventObserver.notifyProgress(pinotTaskConfig,
+          "Number of records compacted: " + String.valueOf(_numRecordsCompacted));
+    }
+
+    File compactedSegmentFile = new File(workingDir, segmentName);
+
+    SegmentConversionResult result = new SegmentConversionResult.Builder()
+        .setFile(compactedSegmentFile)
+        .setTableNameWithType(tableNameWithType)
+        .setSegmentName(configs.get(MinionConstants.SEGMENT_NAME_KEY))
+        .build();
+
+    long endMillis = System.currentTimeMillis();
+    LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis));
+
+    return result;
+  }
+
+  private static SegmentGeneratorConfig getSegmentGeneratorConfig(File workingDir, TableConfig tableConfig,
+      SegmentMetadataImpl segmentMetadata, String segmentName) {
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, segmentMetadata.getSchema());
+    config.setOutDir(workingDir.getPath());
+    config.setSegmentName(segmentName);
+    // Keep index creation time the same as original segment because both segments use the same raw data.
+    // This way, for REFRESH case, when new segment gets pushed to controller, we can use index creation time to
+    // identify if the new pushed segment has newer data than the existing one.
+    config.setCreationTime(String.valueOf(segmentMetadata.getIndexCreationTime()));
+
+    // The time column type info is not stored in the segment metadata.
+    // Keep segment start/end time to properly handle time column type other than EPOCH (e.g.SIMPLE_FORMAT).
+    if (segmentMetadata.getTimeInterval() != null) {
+      config.setTimeColumnName(tableConfig.getValidationConfig().getTimeColumnName());
+      config.setStartTime(Long.toString(segmentMetadata.getStartTime()));
+      config.setEndTime(Long.toString(segmentMetadata.getEndTime()));
+      config.setSegmentTimeUnit(segmentMetadata.getTimeUnit());
+    }
+    return config;
+  }
+
+  private static ImmutableRoaringBitmap getValidDocIds(String tableNameWithType, Map<String, String> configs) {
+    String segmentName = configs.get(MinionConstants.SEGMENT_NAME_KEY);
+    String server = getServer(segmentName, tableNameWithType);
+
+    // get the url for the validDocIds for the server
+    InstanceConfig instanceConfig =
+        MINION_CONTEXT.getClusterManagementTool().getInstanceConfig(MINION_CONTEXT.getClusterName(), server);
+    String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);
+    String url = String.format("%s/segments/%s/%s/validDocIds",
+        endpoint, tableNameWithType, segmentName);
+
+    // get the validDocIds from that server
+    Response response = ClientBuilder.newClient().target(url).request().get(Response.class);
+    Preconditions.checkState(response.getStatus() == Response.Status.OK.getStatusCode(),
+        "Unable to retrieve validDocIds from %s", url);
+    byte[] snapshot = response.readEntity(byte[].class);
+    ImmutableRoaringBitmap validDocIds = new ImmutableRoaringBitmap(ByteBuffer.wrap(snapshot));
+    return validDocIds;
+  }
+
+  @VisibleForTesting
+  public static String getServer(String segmentName, String tableNameWithType) {
+    String server = null;
+    HelixAdmin clusterManagementTool = MINION_CONTEXT.getClusterManagementTool();
+    IdealState idealState =
+        clusterManagementTool.getResourceIdealState(MINION_CONTEXT.getClusterName(), tableNameWithType);

Review Comment:
   We probably want to use external view in case it does not converge yet



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java:
##########
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.BiMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
+import org.apache.pinot.controller.util.CompletionServiceHelper;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@TaskGenerator
+public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactionTaskGenerator.class);
+  private static final String DEFAULT_BUFFER_PERIOD = "7d";
+  private static final String DEFAULT_INVALID_RECORDS_THRESHOLD = "100000";

Review Comment:
   (minor) Store it as long



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org