You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/12/01 03:57:03 UTC
[incubator-pinot] branch master updated: Adding custom metadata
props into both segment metadata properties file and zk metadata record
(#6299)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d6862a2 Adding custom metadata props into both segment metadata properties file and zk metadata record (#6299)
d6862a2 is described below
commit d6862a24caf82e3a548ef28dd8d920cb81595eb0
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Mon Nov 30 19:56:42 2020 -0800
Adding custom metadata props into both segment metadata properties file and zk metadata record (#6299)
---
.../common/metadata/SegmentZKMetadataTest.java | 3 +++
.../helix/core/util/ZKMetadataUtils.java | 5 ++++
.../core/segment/creator/impl/V1Constants.java | 2 ++
.../segment/index/metadata/SegmentMetadata.java | 3 +++
.../index/metadata/SegmentMetadataImpl.java | 28 ++++++++++++++++++++++
.../segment/index/SegmentMetadataImplTest.java | 4 ++++
.../batch/common/SegmentGenerationTaskRunner.java | 1 +
.../batch/hadoop/HadoopSegmentCreationMapper.java | 2 ++
.../spark/SparkSegmentGenerationJobRunner.java | 2 ++
.../standalone/SegmentGenerationJobRunner.java | 2 ++
.../spi/ingestion/batch/BatchConfigProperties.java | 1 +
.../batch/spec/SegmentGenerationTaskSpec.java | 26 ++++++++++++++++++++
12 files changed, 79 insertions(+)
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metadata/SegmentZKMetadataTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metadata/SegmentZKMetadataTest.java
index ad6a487..8bf7b45 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/metadata/SegmentZKMetadataTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/metadata/SegmentZKMetadataTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.metadata;
+import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
@@ -201,6 +202,7 @@ public class SegmentZKMetadataTest {
record.setSimpleField(CommonConstants.Segment.Offline.DOWNLOAD_URL, "http://localhost:8000/testTable_O_3000_4000");
record.setLongField(CommonConstants.Segment.Offline.PUSH_TIME, 4000);
record.setLongField(CommonConstants.Segment.Offline.REFRESH_TIME, 8000);
+ record.setMapField(CommonConstants.Segment.CUSTOM_MAP, ImmutableMap.of("k1", "v1", "k2", "v2"));
return record;
}
@@ -219,6 +221,7 @@ public class SegmentZKMetadataTest {
offlineSegmentMetadata.setDownloadUrl("http://localhost:8000/testTable_O_3000_4000");
offlineSegmentMetadata.setPushTime(4000);
offlineSegmentMetadata.setRefreshTime(8000);
+ offlineSegmentMetadata.setCustomMap(ImmutableMap.of("k1", "v1", "k2", "v2"));
return offlineSegmentMetadata;
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
index 9cb4455..a6661a4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.utils.CommonConstants.Segment.SegmentType;
import org.apache.pinot.core.data.partition.PartitionFunction;
import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
@@ -48,6 +49,10 @@ public class ZKMetadataUtils {
offlineSegmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
offlineSegmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
offlineSegmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
+ SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
+ new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
+ offlineSegmentZKMetadata.getCustomMap());
+ offlineSegmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(segmentMetadata.getCustomMap()));
// Extract column partition metadata (if any), and set it into segment ZK metadata.
Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
index 0498f8c..0a09394 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
@@ -62,6 +62,8 @@ public class V1Constants {
public static final String DATETIME_COLUMNS = "segment.datetime.column.names";
public static final String SEGMENT_TOTAL_DOCS = "segment.total.docs";
public static final String SEGMENT_PADDING_CHARACTER = "segment.padding.character";
+
+ public static final String CUSTOM_SUBSET = "custom";
}
public static class Column {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
index d01406c..d409a65 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.segment.index.metadata;
import java.io.File;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.data.Schema;
@@ -107,4 +108,6 @@ public interface SegmentMetadata {
char getPaddingCharacter();
boolean close();
+
+ Map<String, String> getCustomMap();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
index 1245034..15ed544 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
@@ -44,6 +44,7 @@ import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -52,6 +53,7 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
+import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
@@ -104,6 +106,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
private int _totalDocs;
private long _segmentStartTime;
private long _segmentEndTime;
+ private Map<String, String> _customMap;
/**
* For segments on disk.
@@ -117,6 +120,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
_columnMetadataMap = new HashMap<>();
_allColumns = new HashSet<>();
_schema = new Schema();
+ _customMap = new HashMap<>();
init(segmentMetadataPropertiesConfiguration);
File creationMetaFile = SegmentDirectoryPaths.findCreationMetaFile(indexDir);
@@ -160,6 +164,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
_allColumns = schema.getColumnNames();
_schema = schema;
_totalDocs = segmentMetadataPropertiesConfiguration.getInt(SEGMENT_TOTAL_DOCS);
+ _customMap = new HashMap<>();
}
public static PropertiesConfiguration getPropertiesConfiguration(File indexDir) {
@@ -263,6 +268,18 @@ public class SegmentMetadataImpl implements SegmentMetadata {
segmentMetadataPropertiesConfiguration.subset(StarTreeV2Constants.MetadataKey.getStarTreePrefix(i))));
}
}
+
+ // Set custom configs from metadata properties
+ setCustomConfigs(segmentMetadataPropertiesConfiguration, _customMap);
+ }
+
+ private static void setCustomConfigs(Configuration segmentMetadataPropertiesConfiguration, Map<String, String> customConfigsMap) {
+ Configuration customConfigs = segmentMetadataPropertiesConfiguration.subset(V1Constants.MetadataKeys.Segment.CUSTOM_SUBSET);
+ Iterator<String> customKeysIter = customConfigs.getKeys();
+ while (customKeysIter.hasNext()) {
+ String key = customKeysIter.next();
+ customConfigsMap.put(key, customConfigs.getString(key));
+ }
}
/**
@@ -427,6 +444,11 @@ public class SegmentMetadataImpl implements SegmentMetadata {
return false;
}
+ @Override
+ public Map<String, String> getCustomMap() {
+ return _customMap;
+ }
+
public List<StarTreeV2Metadata> getStarTreeV2MetadataList() {
return _starTreeV2MetadataList;
}
@@ -531,6 +553,12 @@ public class SegmentMetadataImpl implements SegmentMetadata {
segmentMetadata.put("creatorName", _creatorName);
segmentMetadata.put("paddingCharacter", String.valueOf(_paddingCharacter));
+ ObjectNode customConfigs = JsonUtils.newObjectNode();
+ for (String key : _customMap.keySet()) {
+ customConfigs.put(key, _customMap.get(key));
+ }
+ segmentMetadata.set("custom", customConfigs);
+
ArrayNode columnsMetadata = JsonUtils.newArrayNode();
for (String column : _allColumns) {
if (columnFilter != null && !columnFilter.contains(column)) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/SegmentMetadataImplTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/SegmentMetadataImplTest.java
index a311bd2..1be7a3e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/SegmentMetadataImplTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/SegmentMetadataImplTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.segment.index;
import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@@ -61,6 +62,7 @@ public class SegmentMetadataImplTest {
// to have the time column values in allowed range. Until then, the check
// is explicitly disabled
config.setSkipTimeValueCheck(true);
+ config.setCustomProperties(ImmutableMap.of("custom.k1", "v1", "custom.k2", "v2"));
final SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
driver.init(config);
driver.build();
@@ -88,6 +90,8 @@ public class SegmentMetadataImplTest {
assertEquals(jsonMeta.get("endTimeMillis").asLong(), metadata.getTimeInterval().getEndMillis());
assertEquals(jsonMeta.get("pushTimeMillis").asLong(), metadata.getPushTime());
assertEquals(jsonMeta.get("refreshTimeMillis").asLong(), metadata.getPushTime());
+ assertEquals(jsonMeta.get("custom").get("k1").asText(), metadata.getCustomMap().get("k1"));
+ assertEquals(jsonMeta.get("custom").get("k2").asText(), metadata.getCustomMap().get("k2"));
JsonNode jsonColumnsMeta = jsonMeta.get("columns");
int numColumns = jsonColumnsMeta.size();
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
index 9c80942..2bd0762 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
@@ -95,6 +95,7 @@ public class SegmentGenerationTaskRunner implements Serializable {
segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
segmentGeneratorConfig.setRecordReaderPath(_taskSpec.getRecordReaderSpec().getClassName());
segmentGeneratorConfig.setInputFilePath(_taskSpec.getInputFilePath());
+ segmentGeneratorConfig.setCustomProperties(_taskSpec.getCustomProperties());
//build segment
SegmentIndexCreationDriverImpl segmentIndexCreationDriver = new SegmentIndexCreationDriverImpl();
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
index c62c7dc..5b5d6f5 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
@@ -33,6 +33,7 @@ import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunne
import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec;
@@ -139,6 +140,7 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long
.setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode());
taskSpec.setSequenceId(idx);
taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
+ taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
// Start a thread that reports progress every minute during segment generation to prevent job getting killed
Thread progressReporterThread = new Thread(getProgressReporter(context));
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index 9b36a91..5be70f2 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -47,6 +47,7 @@ import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
@@ -303,6 +304,7 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode());
taskSpec.setSequenceId(idx);
taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
+ taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
String segmentName = taskRunner.run();
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index f60a7e3..fc2f9be 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -40,6 +40,7 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
@@ -186,6 +187,7 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
taskSpec.setTableConfig(tableConfig.toJsonNode());
taskSpec.setSequenceId(i);
taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
+ taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
LOGGER.info("Submitting one Segment Generation Task for {}", inputFileURI);
_executorService.submit(() -> {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
index 2ebda7b..5d276aa 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
@@ -39,6 +39,7 @@ public class BatchConfigProperties {
public static final String RECORD_READER_CONFIG_CLASS = "recordReader.config.className";
public static final String RECORD_READER_PROP_PREFIX = "recordReader.prop";
+ public static final String INPUT_DATA_FILE_URI_KEY = "input.data.file.uri";
/**
* Helper method to create a batch config property
*/
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java
index 5455bfa..d19d306 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java
@@ -20,6 +20,8 @@ package org.apache.pinot.spi.ingestion.batch.spec;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.pinot.spi.data.Schema;
@@ -28,6 +30,8 @@ import org.apache.pinot.spi.data.Schema;
* Note that this task creates a segment directory, not tar file.
*/
public class SegmentGenerationTaskSpec implements Serializable {
+ public static final String CUSTOM_SUBSET = "custom";
+ public static final String CUSTOM_PREFIX = CUSTOM_SUBSET + '.';
/**
* Table config to create segment
@@ -64,6 +68,11 @@ public class SegmentGenerationTaskSpec implements Serializable {
*/
private int _sequenceId;
+ /**
+ * Custom properties set into segment metadata
+ */
+ private Map<String, String> _customProperties = new HashMap<>();
+
public JsonNode getTableConfig() {
return _tableConfig;
}
@@ -119,4 +128,21 @@ public class SegmentGenerationTaskSpec implements Serializable {
public void setSequenceId(int sequenceId) {
_sequenceId = sequenceId;
}
+
+ public void setCustomProperty(String key, String value) {
+ if (!key.startsWith(CUSTOM_PREFIX)) {
+ key = CUSTOM_PREFIX + key;
+ }
+ _customProperties.put(key, value);
+ }
+
+ public void setCustomProperties(Map<String, String> customProperties) {
+ for (String key : customProperties.keySet()) {
+ setCustomProperty(key, customProperties.get(key));
+ }
+ }
+
+ public Map<String, String> getCustomProperties() {
+ return _customProperties;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org