You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/11/30 01:56:27 UTC
[incubator-pinot] 01/01: Adding custom metadata props into both
segment metadata properties file and zk metadata record
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch all_custom_metadata_props_in_segment_creation
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit af3ff4076ec166d1e9f3b036079c23b800e94899
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Sun Nov 29 17:55:50 2020 -0800
Adding custom metadata props into both segment metadata properties file and zk metadata record
---
.../helix/core/util/ZKMetadataUtils.java | 1 +
.../core/segment/creator/impl/V1Constants.java | 2 ++
.../segment/index/metadata/SegmentMetadata.java | 3 +++
.../index/metadata/SegmentMetadataImpl.java | 28 ++++++++++++++++++++++
.../batch/common/SegmentGenerationTaskRunner.java | 1 +
.../batch/hadoop/HadoopSegmentCreationMapper.java | 2 ++
.../spark/SparkSegmentGenerationJobRunner.java | 2 ++
.../standalone/SegmentGenerationJobRunner.java | 2 ++
.../spi/ingestion/batch/BatchConfigProperties.java | 1 +
.../batch/spec/SegmentGenerationTaskSpec.java | 26 ++++++++++++++++++++
10 files changed, 68 insertions(+)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
index 9cb4455..b4103a7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
@@ -48,6 +48,7 @@ public class ZKMetadataUtils {
offlineSegmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
offlineSegmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
offlineSegmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
+ offlineSegmentZKMetadata.setCustomMap(segmentMetadata.getCustomMap());
// Extract column partition metadata (if any), and set it into segment ZK metadata.
Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
index 0498f8c..0a09394 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
@@ -62,6 +62,8 @@ public class V1Constants {
public static final String DATETIME_COLUMNS = "segment.datetime.column.names";
public static final String SEGMENT_TOTAL_DOCS = "segment.total.docs";
public static final String SEGMENT_PADDING_CHARACTER = "segment.padding.character";
+
+ public static final String CUSTOM_SUBSET = "custom";
}
public static class Column {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
index d01406c..d409a65 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.segment.index.metadata;
import java.io.File;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.data.Schema;
@@ -107,4 +108,6 @@ public interface SegmentMetadata {
char getPaddingCharacter();
boolean close();
+
+ Map<String, String> getCustomMap();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
index 1245034..15ed544 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
@@ -44,6 +44,7 @@ import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -52,6 +53,7 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
+import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
@@ -104,6 +106,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
private int _totalDocs;
private long _segmentStartTime;
private long _segmentEndTime;
+ private Map<String, String> _customMap;
/**
* For segments on disk.
@@ -117,6 +120,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
_columnMetadataMap = new HashMap<>();
_allColumns = new HashSet<>();
_schema = new Schema();
+ _customMap = new HashMap<>();
init(segmentMetadataPropertiesConfiguration);
File creationMetaFile = SegmentDirectoryPaths.findCreationMetaFile(indexDir);
@@ -160,6 +164,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
_allColumns = schema.getColumnNames();
_schema = schema;
_totalDocs = segmentMetadataPropertiesConfiguration.getInt(SEGMENT_TOTAL_DOCS);
+ _customMap = new HashMap<>();
}
public static PropertiesConfiguration getPropertiesConfiguration(File indexDir) {
@@ -263,6 +268,18 @@ public class SegmentMetadataImpl implements SegmentMetadata {
segmentMetadataPropertiesConfiguration.subset(StarTreeV2Constants.MetadataKey.getStarTreePrefix(i))));
}
}
+
+ // Set custom configs from metadata properties
+ setCustomConfigs(segmentMetadataPropertiesConfiguration, _customMap);
+ }
+
+ private static void setCustomConfigs(Configuration segmentMetadataPropertiesConfiguration, Map<String, String> customConfigsMap) {
+ Configuration customConfigs = segmentMetadataPropertiesConfiguration.subset(V1Constants.MetadataKeys.Segment.CUSTOM_SUBSET);
+ Iterator<String> customKeysIter = customConfigs.getKeys();
+ while (customKeysIter.hasNext()) {
+ String key = customKeysIter.next();
+ customConfigsMap.put(key, customConfigs.getString(key));
+ }
}
/**
@@ -427,6 +444,11 @@ public class SegmentMetadataImpl implements SegmentMetadata {
return false;
}
+ @Override
+ public Map<String, String> getCustomMap() {
+ return _customMap;
+ }
+
public List<StarTreeV2Metadata> getStarTreeV2MetadataList() {
return _starTreeV2MetadataList;
}
@@ -531,6 +553,12 @@ public class SegmentMetadataImpl implements SegmentMetadata {
segmentMetadata.put("creatorName", _creatorName);
segmentMetadata.put("paddingCharacter", String.valueOf(_paddingCharacter));
+ ObjectNode customConfigs = JsonUtils.newObjectNode();
+ for (String key : _customMap.keySet()) {
+ customConfigs.put(key, _customMap.get(key));
+ }
+ segmentMetadata.set("custom", customConfigs);
+
ArrayNode columnsMetadata = JsonUtils.newArrayNode();
for (String column : _allColumns) {
if (columnFilter != null && !columnFilter.contains(column)) {
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
index 9c80942..2bd0762 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
@@ -95,6 +95,7 @@ public class SegmentGenerationTaskRunner implements Serializable {
segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
segmentGeneratorConfig.setRecordReaderPath(_taskSpec.getRecordReaderSpec().getClassName());
segmentGeneratorConfig.setInputFilePath(_taskSpec.getInputFilePath());
+ segmentGeneratorConfig.setCustomProperties(_taskSpec.getCustomProperties());
//build segment
SegmentIndexCreationDriverImpl segmentIndexCreationDriver = new SegmentIndexCreationDriverImpl();
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
index c62c7dc..5b5d6f5 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
@@ -33,6 +33,7 @@ import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunne
import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec;
@@ -139,6 +140,7 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long
.setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode());
taskSpec.setSequenceId(idx);
taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
+ taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
// Start a thread that reports progress every minute during segment generation to prevent job getting killed
Thread progressReporterThread = new Thread(getProgressReporter(context));
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index 9b36a91..5be70f2 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -47,6 +47,7 @@ import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
@@ -303,6 +304,7 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode());
taskSpec.setSequenceId(idx);
taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
+ taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
String segmentName = taskRunner.run();
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index f60a7e3..fc2f9be 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -40,6 +40,7 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
@@ -186,6 +187,7 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
taskSpec.setTableConfig(tableConfig.toJsonNode());
taskSpec.setSequenceId(i);
taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
+ taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
LOGGER.info("Submitting one Segment Generation Task for {}", inputFileURI);
_executorService.submit(() -> {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
index 2ebda7b..5d276aa 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
@@ -39,6 +39,7 @@ public class BatchConfigProperties {
public static final String RECORD_READER_CONFIG_CLASS = "recordReader.config.className";
public static final String RECORD_READER_PROP_PREFIX = "recordReader.prop";
+ public static final String INPUT_DATA_FILE_URI_KEY = "input.data.file.uri";
/**
* Helper method to create a batch config property
*/
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java
index 5455bfa..d19d306 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java
@@ -20,6 +20,8 @@ package org.apache.pinot.spi.ingestion.batch.spec;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.pinot.spi.data.Schema;
@@ -28,6 +30,8 @@ import org.apache.pinot.spi.data.Schema;
* Note that this task creates a segment directory, not tar file.
*/
public class SegmentGenerationTaskSpec implements Serializable {
+ public static final String CUSTOM_SUBSET = "custom";
+ public static final String CUSTOM_PREFIX = CUSTOM_SUBSET + '.';
/**
* Table config to create segment
@@ -64,6 +68,11 @@ public class SegmentGenerationTaskSpec implements Serializable {
*/
private int _sequenceId;
+ /**
+ * Custom properties set into segment metadata
+ */
+ private Map<String, String> _customProperties = new HashMap<>();
+
public JsonNode getTableConfig() {
return _tableConfig;
}
@@ -119,4 +128,21 @@ public class SegmentGenerationTaskSpec implements Serializable {
public void setSequenceId(int sequenceId) {
_sequenceId = sequenceId;
}
+
+ public void setCustomProperty(String key, String value) {
+ if (!key.startsWith(CUSTOM_PREFIX)) {
+ key = CUSTOM_PREFIX + key;
+ }
+ _customProperties.put(key, value);
+ }
+
+ public void setCustomProperties(Map<String, String> customProperties) {
+ for (String key : customProperties.keySet()) {
+ setCustomProperty(key, customProperties.get(key));
+ }
+ }
+
+ public Map<String, String> getCustomProperties() {
+ return _customProperties;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org