You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/11/30 01:56:26 UTC

[incubator-pinot] branch all_custom_metadata_props_in_segment_creation created (now af3ff40)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch all_custom_metadata_props_in_segment_creation
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at af3ff40  Adding custom metadata props into both segment metadata properties file and zk metadata record

This branch includes the following new commits:

     new af3ff40  Adding custom metadata props into both segment metadata properties file and zk metadata record

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Adding custom metadata props into both segment metadata properties file and zk metadata record

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch all_custom_metadata_props_in_segment_creation
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit af3ff4076ec166d1e9f3b036079c23b800e94899
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Sun Nov 29 17:55:50 2020 -0800

    Adding custom metadata props into both segment metadata properties file and zk metadata record
---
 .../helix/core/util/ZKMetadataUtils.java           |  1 +
 .../core/segment/creator/impl/V1Constants.java     |  2 ++
 .../segment/index/metadata/SegmentMetadata.java    |  3 +++
 .../index/metadata/SegmentMetadataImpl.java        | 28 ++++++++++++++++++++++
 .../batch/common/SegmentGenerationTaskRunner.java  |  1 +
 .../batch/hadoop/HadoopSegmentCreationMapper.java  |  2 ++
 .../spark/SparkSegmentGenerationJobRunner.java     |  2 ++
 .../standalone/SegmentGenerationJobRunner.java     |  2 ++
 .../spi/ingestion/batch/BatchConfigProperties.java |  1 +
 .../batch/spec/SegmentGenerationTaskSpec.java      | 26 ++++++++++++++++++++
 10 files changed, 68 insertions(+)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
index 9cb4455..b4103a7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
@@ -48,6 +48,7 @@ public class ZKMetadataUtils {
     offlineSegmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
     offlineSegmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
     offlineSegmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
+    offlineSegmentZKMetadata.setCustomMap(segmentMetadata.getCustomMap());
 
     // Extract column partition metadata (if any), and set it into segment ZK metadata.
     Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
index 0498f8c..0a09394 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
@@ -62,6 +62,8 @@ public class V1Constants {
       public static final String DATETIME_COLUMNS = "segment.datetime.column.names";
       public static final String SEGMENT_TOTAL_DOCS = "segment.total.docs";
       public static final String SEGMENT_PADDING_CHARACTER = "segment.padding.character";
+
+      public static final String CUSTOM_SUBSET = "custom";
     }
 
     public static class Column {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
index d01406c..d409a65 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.segment.index.metadata;
 
 import java.io.File;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.data.Schema;
@@ -107,4 +108,6 @@ public interface SegmentMetadata {
   char getPaddingCharacter();
 
   boolean close();
+
+  Map<String, String> getCustomMap();
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
index 1245034..15ed544 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
@@ -44,6 +44,7 @@ import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -52,6 +53,7 @@ import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nullable;
 
+import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
@@ -104,6 +106,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
   private int _totalDocs;
   private long _segmentStartTime;
   private long _segmentEndTime;
+  private Map<String, String> _customMap;
 
   /**
    * For segments on disk.
@@ -117,6 +120,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
     _columnMetadataMap = new HashMap<>();
     _allColumns = new HashSet<>();
     _schema = new Schema();
+    _customMap = new HashMap<>();
 
     init(segmentMetadataPropertiesConfiguration);
     File creationMetaFile = SegmentDirectoryPaths.findCreationMetaFile(indexDir);
@@ -160,6 +164,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
     _allColumns = schema.getColumnNames();
     _schema = schema;
     _totalDocs = segmentMetadataPropertiesConfiguration.getInt(SEGMENT_TOTAL_DOCS);
+    _customMap = new HashMap<>();
   }
 
   public static PropertiesConfiguration getPropertiesConfiguration(File indexDir) {
@@ -263,6 +268,18 @@ public class SegmentMetadataImpl implements SegmentMetadata {
             segmentMetadataPropertiesConfiguration.subset(StarTreeV2Constants.MetadataKey.getStarTreePrefix(i))));
       }
     }
+
+    // Set custom configs from metadata properties
+    setCustomConfigs(segmentMetadataPropertiesConfiguration, _customMap);
+  }
+
+  private static void setCustomConfigs(Configuration segmentMetadataPropertiesConfiguration, Map<String, String> customConfigsMap) {
+    Configuration customConfigs = segmentMetadataPropertiesConfiguration.subset(V1Constants.MetadataKeys.Segment.CUSTOM_SUBSET);
+    Iterator<String> customKeysIter = customConfigs.getKeys();
+    while (customKeysIter.hasNext()) {
+      String key = customKeysIter.next();
+      customConfigsMap.put(key, customConfigs.getString(key));
+    }
   }
 
   /**
@@ -427,6 +444,11 @@ public class SegmentMetadataImpl implements SegmentMetadata {
     return false;
   }
 
+  @Override
+  public Map<String, String> getCustomMap() {
+    return _customMap;
+  }
+
   public List<StarTreeV2Metadata> getStarTreeV2MetadataList() {
     return _starTreeV2MetadataList;
   }
@@ -531,6 +553,12 @@ public class SegmentMetadataImpl implements SegmentMetadata {
     segmentMetadata.put("creatorName", _creatorName);
     segmentMetadata.put("paddingCharacter", String.valueOf(_paddingCharacter));
 
+    ObjectNode customConfigs = JsonUtils.newObjectNode();
+    for (String key : _customMap.keySet()) {
+      customConfigs.put(key, _customMap.get(key));
+    }
+    segmentMetadata.set("custom", customConfigs);
+
     ArrayNode columnsMetadata = JsonUtils.newArrayNode();
     for (String column : _allColumns) {
       if (columnFilter != null && !columnFilter.contains(column)) {
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
index 9c80942..2bd0762 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
@@ -95,6 +95,7 @@ public class SegmentGenerationTaskRunner implements Serializable {
     segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
     segmentGeneratorConfig.setRecordReaderPath(_taskSpec.getRecordReaderSpec().getClassName());
     segmentGeneratorConfig.setInputFilePath(_taskSpec.getInputFilePath());
+    segmentGeneratorConfig.setCustomProperties(_taskSpec.getCustomProperties());
 
     //build segment
     SegmentIndexCreationDriverImpl segmentIndexCreationDriver = new SegmentIndexCreationDriverImpl();
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
index c62c7dc..5b5d6f5 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
@@ -33,6 +33,7 @@ import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunne
 import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.ingestion.batch.spec.Constants;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec;
@@ -139,6 +140,7 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long
           .setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode());
       taskSpec.setSequenceId(idx);
       taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
+      taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
 
       // Start a thread that reports progress every minute during segment generation to prevent job getting killed
       Thread progressReporterThread = new Thread(getProgressReporter(context));
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index 9b36a91..5be70f2 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -47,6 +47,7 @@ import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
 import org.apache.pinot.spi.ingestion.batch.spec.Constants;
 import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
@@ -303,6 +304,7 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
               SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode());
           taskSpec.setSequenceId(idx);
           taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
+          taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
 
           SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
           String segmentName = taskRunner.run();
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index f60a7e3..fc2f9be 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -40,6 +40,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
 import org.apache.pinot.spi.ingestion.batch.spec.Constants;
 import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
@@ -186,6 +187,7 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
         taskSpec.setTableConfig(tableConfig.toJsonNode());
         taskSpec.setSequenceId(i);
         taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
+        taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
 
         LOGGER.info("Submitting one Segment Generation Task for {}", inputFileURI);
         _executorService.submit(() -> {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
index 2ebda7b..5d276aa 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
@@ -39,6 +39,7 @@ public class BatchConfigProperties {
   public static final String RECORD_READER_CONFIG_CLASS = "recordReader.config.className";
   public static final String RECORD_READER_PROP_PREFIX = "recordReader.prop";
 
+  public static final String INPUT_DATA_FILE_URI_KEY = "input.data.file.uri";
   /**
    * Helper method to create a batch config property
    */
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java
index 5455bfa..d19d306 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java
@@ -20,6 +20,8 @@ package org.apache.pinot.spi.ingestion.batch.spec;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.pinot.spi.data.Schema;
 
 
@@ -28,6 +30,8 @@ import org.apache.pinot.spi.data.Schema;
  * Note that this task creates a segment directory, not tar file.
  */
 public class SegmentGenerationTaskSpec implements Serializable {
+  public static final String CUSTOM_SUBSET = "custom";
+  public static final String CUSTOM_PREFIX = CUSTOM_SUBSET + '.';
 
   /**
    * Table config to create segment
@@ -64,6 +68,11 @@ public class SegmentGenerationTaskSpec implements Serializable {
    */
   private int _sequenceId;
 
+  /**
+   * Custom properties set into segment metadata
+   */
+  private Map<String, String> _customProperties = new HashMap<>();
+
   public JsonNode getTableConfig() {
     return _tableConfig;
   }
@@ -119,4 +128,21 @@ public class SegmentGenerationTaskSpec implements Serializable {
   public void setSequenceId(int sequenceId) {
     _sequenceId = sequenceId;
   }
+
+  public void setCustomProperty(String key, String value) {
+    if (!key.startsWith(CUSTOM_PREFIX)) {
+      key = CUSTOM_PREFIX + key;
+    }
+    _customProperties.put(key, value);
+  }
+
+  public void setCustomProperties(Map<String, String> customProperties) {
+    for (String key : customProperties.keySet()) {
+      setCustomProperty(key, customProperties.get(key));
+    }
+  }
+
+  public Map<String, String> getCustomProperties() {
+    return _customProperties;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org