You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/12/01 03:57:03 UTC

[incubator-pinot] branch master updated: Adding custom metadata props into both segment metadata properties file and zk metadata record (#6299)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d6862a2  Adding custom metadata props into both segment metadata properties file and zk metadata record (#6299)
d6862a2 is described below

commit d6862a24caf82e3a548ef28dd8d920cb81595eb0
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Mon Nov 30 19:56:42 2020 -0800

    Adding custom metadata props into both segment metadata properties file and zk metadata record (#6299)
---
 .../common/metadata/SegmentZKMetadataTest.java     |  3 +++
 .../helix/core/util/ZKMetadataUtils.java           |  5 ++++
 .../core/segment/creator/impl/V1Constants.java     |  2 ++
 .../segment/index/metadata/SegmentMetadata.java    |  3 +++
 .../index/metadata/SegmentMetadataImpl.java        | 28 ++++++++++++++++++++++
 .../segment/index/SegmentMetadataImplTest.java     |  4 ++++
 .../batch/common/SegmentGenerationTaskRunner.java  |  1 +
 .../batch/hadoop/HadoopSegmentCreationMapper.java  |  2 ++
 .../spark/SparkSegmentGenerationJobRunner.java     |  2 ++
 .../standalone/SegmentGenerationJobRunner.java     |  2 ++
 .../spi/ingestion/batch/BatchConfigProperties.java |  1 +
 .../batch/spec/SegmentGenerationTaskSpec.java      | 26 ++++++++++++++++++++
 12 files changed, 79 insertions(+)

diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metadata/SegmentZKMetadataTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metadata/SegmentZKMetadataTest.java
index ad6a487..8bf7b45 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/metadata/SegmentZKMetadataTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/metadata/SegmentZKMetadataTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.metadata;
 
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -201,6 +202,7 @@ public class SegmentZKMetadataTest {
     record.setSimpleField(CommonConstants.Segment.Offline.DOWNLOAD_URL, "http://localhost:8000/testTable_O_3000_4000");
     record.setLongField(CommonConstants.Segment.Offline.PUSH_TIME, 4000);
     record.setLongField(CommonConstants.Segment.Offline.REFRESH_TIME, 8000);
+    record.setMapField(CommonConstants.Segment.CUSTOM_MAP, ImmutableMap.of("k1", "v1", "k2", "v2"));
     return record;
   }
 
@@ -219,6 +221,7 @@ public class SegmentZKMetadataTest {
     offlineSegmentMetadata.setDownloadUrl("http://localhost:8000/testTable_O_3000_4000");
     offlineSegmentMetadata.setPushTime(4000);
     offlineSegmentMetadata.setRefreshTime(8000);
+    offlineSegmentMetadata.setCustomMap(ImmutableMap.of("k1", "v1", "k2", "v2"));
     return offlineSegmentMetadata;
   }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
index 9cb4455..a6661a4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
 import org.apache.pinot.common.utils.CommonConstants.Segment.SegmentType;
 import org.apache.pinot.core.data.partition.PartitionFunction;
 import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
@@ -48,6 +49,10 @@ public class ZKMetadataUtils {
     offlineSegmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
     offlineSegmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
     offlineSegmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
+    SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
+        new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
+            offlineSegmentZKMetadata.getCustomMap());
+    offlineSegmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(segmentMetadata.getCustomMap()));
 
     // Extract column partition metadata (if any), and set it into segment ZK metadata.
     Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
index 0498f8c..0a09394 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
@@ -62,6 +62,8 @@ public class V1Constants {
       public static final String DATETIME_COLUMNS = "segment.datetime.column.names";
       public static final String SEGMENT_TOTAL_DOCS = "segment.total.docs";
       public static final String SEGMENT_PADDING_CHARACTER = "segment.padding.character";
+
+      public static final String CUSTOM_SUBSET = "custom";
     }
 
     public static class Column {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
index d01406c..d409a65 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.segment.index.metadata;
 
 import java.io.File;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.data.Schema;
@@ -107,4 +108,6 @@ public interface SegmentMetadata {
   char getPaddingCharacter();
 
   boolean close();
+
+  Map<String, String> getCustomMap();
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
index 1245034..15ed544 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
@@ -44,6 +44,7 @@ import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -52,6 +53,7 @@ import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nullable;
 
+import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
@@ -104,6 +106,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
   private int _totalDocs;
   private long _segmentStartTime;
   private long _segmentEndTime;
+  private Map<String, String> _customMap;
 
   /**
    * For segments on disk.
@@ -117,6 +120,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
     _columnMetadataMap = new HashMap<>();
     _allColumns = new HashSet<>();
     _schema = new Schema();
+    _customMap = new HashMap<>();
 
     init(segmentMetadataPropertiesConfiguration);
     File creationMetaFile = SegmentDirectoryPaths.findCreationMetaFile(indexDir);
@@ -160,6 +164,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
     _allColumns = schema.getColumnNames();
     _schema = schema;
     _totalDocs = segmentMetadataPropertiesConfiguration.getInt(SEGMENT_TOTAL_DOCS);
+    _customMap = new HashMap<>();
   }
 
   public static PropertiesConfiguration getPropertiesConfiguration(File indexDir) {
@@ -263,6 +268,18 @@ public class SegmentMetadataImpl implements SegmentMetadata {
             segmentMetadataPropertiesConfiguration.subset(StarTreeV2Constants.MetadataKey.getStarTreePrefix(i))));
       }
     }
+
+    // Set custom configs from metadata properties
+    setCustomConfigs(segmentMetadataPropertiesConfiguration, _customMap);
+  }
+
+  private static void setCustomConfigs(Configuration segmentMetadataPropertiesConfiguration, Map<String, String> customConfigsMap) {
+    Configuration customConfigs = segmentMetadataPropertiesConfiguration.subset(V1Constants.MetadataKeys.Segment.CUSTOM_SUBSET);
+    Iterator<String> customKeysIter = customConfigs.getKeys();
+    while (customKeysIter.hasNext()) {
+      String key = customKeysIter.next();
+      customConfigsMap.put(key, customConfigs.getString(key));
+    }
   }
 
   /**
@@ -427,6 +444,11 @@ public class SegmentMetadataImpl implements SegmentMetadata {
     return false;
   }
 
+  @Override
+  public Map<String, String> getCustomMap() {
+    return _customMap;
+  }
+
   public List<StarTreeV2Metadata> getStarTreeV2MetadataList() {
     return _starTreeV2MetadataList;
   }
@@ -531,6 +553,12 @@ public class SegmentMetadataImpl implements SegmentMetadata {
     segmentMetadata.put("creatorName", _creatorName);
     segmentMetadata.put("paddingCharacter", String.valueOf(_paddingCharacter));
 
+    ObjectNode customConfigs = JsonUtils.newObjectNode();
+    for (String key : _customMap.keySet()) {
+      customConfigs.put(key, _customMap.get(key));
+    }
+    segmentMetadata.set("custom", customConfigs);
+
     ArrayNode columnsMetadata = JsonUtils.newArrayNode();
     for (String column : _allColumns) {
       if (columnFilter != null && !columnFilter.contains(column)) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/SegmentMetadataImplTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/SegmentMetadataImplTest.java
index a311bd2..1be7a3e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/SegmentMetadataImplTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/SegmentMetadataImplTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.segment.index;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableMap;
 import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
@@ -61,6 +62,7 @@ public class SegmentMetadataImplTest {
     // to have the time column values in allowed range. Until then, the check
     // is explicitly disabled
     config.setSkipTimeValueCheck(true);
+    config.setCustomProperties(ImmutableMap.of("custom.k1", "v1", "custom.k2", "v2"));
     final SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
     driver.init(config);
     driver.build();
@@ -88,6 +90,8 @@ public class SegmentMetadataImplTest {
     assertEquals(jsonMeta.get("endTimeMillis").asLong(), metadata.getTimeInterval().getEndMillis());
     assertEquals(jsonMeta.get("pushTimeMillis").asLong(), metadata.getPushTime());
     assertEquals(jsonMeta.get("refreshTimeMillis").asLong(), metadata.getPushTime());
+    assertEquals(jsonMeta.get("custom").get("k1").asText(), metadata.getCustomMap().get("k1"));
+    assertEquals(jsonMeta.get("custom").get("k2").asText(), metadata.getCustomMap().get("k2"));
 
     JsonNode jsonColumnsMeta = jsonMeta.get("columns");
     int numColumns = jsonColumnsMeta.size();
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
index 9c80942..2bd0762 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
@@ -95,6 +95,7 @@ public class SegmentGenerationTaskRunner implements Serializable {
     segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
     segmentGeneratorConfig.setRecordReaderPath(_taskSpec.getRecordReaderSpec().getClassName());
     segmentGeneratorConfig.setInputFilePath(_taskSpec.getInputFilePath());
+    segmentGeneratorConfig.setCustomProperties(_taskSpec.getCustomProperties());
 
     //build segment
     SegmentIndexCreationDriverImpl segmentIndexCreationDriver = new SegmentIndexCreationDriverImpl();
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
index c62c7dc..5b5d6f5 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
@@ -33,6 +33,7 @@ import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunne
 import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.ingestion.batch.spec.Constants;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec;
@@ -139,6 +140,7 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long
           .setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode());
       taskSpec.setSequenceId(idx);
       taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
+      taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
 
       // Start a thread that reports progress every minute during segment generation to prevent job getting killed
       Thread progressReporterThread = new Thread(getProgressReporter(context));
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index 9b36a91..5be70f2 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -47,6 +47,7 @@ import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
 import org.apache.pinot.spi.ingestion.batch.spec.Constants;
 import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
@@ -303,6 +304,7 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
               SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode());
           taskSpec.setSequenceId(idx);
           taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
+          taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
 
           SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
           String segmentName = taskRunner.run();
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index f60a7e3..fc2f9be 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -40,6 +40,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
 import org.apache.pinot.spi.ingestion.batch.spec.Constants;
 import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
@@ -186,6 +187,7 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
         taskSpec.setTableConfig(tableConfig.toJsonNode());
         taskSpec.setSequenceId(i);
         taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
+        taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
 
         LOGGER.info("Submitting one Segment Generation Task for {}", inputFileURI);
         _executorService.submit(() -> {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
index 2ebda7b..5d276aa 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
@@ -39,6 +39,7 @@ public class BatchConfigProperties {
   public static final String RECORD_READER_CONFIG_CLASS = "recordReader.config.className";
   public static final String RECORD_READER_PROP_PREFIX = "recordReader.prop";
 
+  public static final String INPUT_DATA_FILE_URI_KEY = "input.data.file.uri";
   /**
    * Helper method to create a batch config property
    */
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java
index 5455bfa..d19d306 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java
@@ -20,6 +20,8 @@ package org.apache.pinot.spi.ingestion.batch.spec;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.pinot.spi.data.Schema;
 
 
@@ -28,6 +30,8 @@ import org.apache.pinot.spi.data.Schema;
  * Note that this task creates a segment directory, not tar file.
  */
 public class SegmentGenerationTaskSpec implements Serializable {
+  public static final String CUSTOM_SUBSET = "custom";
+  public static final String CUSTOM_PREFIX = CUSTOM_SUBSET + '.';
 
   /**
    * Table config to create segment
@@ -64,6 +68,11 @@ public class SegmentGenerationTaskSpec implements Serializable {
    */
   private int _sequenceId;
 
+  /**
+   * Custom properties set into segment metadata
+   */
+  private Map<String, String> _customProperties = new HashMap<>();
+
   public JsonNode getTableConfig() {
     return _tableConfig;
   }
@@ -119,4 +128,21 @@ public class SegmentGenerationTaskSpec implements Serializable {
   public void setSequenceId(int sequenceId) {
     _sequenceId = sequenceId;
   }
+
+  public void setCustomProperty(String key, String value) {
+    if (!key.startsWith(CUSTOM_PREFIX)) {
+      key = CUSTOM_PREFIX + key;
+    }
+    _customProperties.put(key, value);
+  }
+
+  public void setCustomProperties(Map<String, String> customProperties) {
+    for (String key : customProperties.keySet()) {
+      setCustomProperty(key, customProperties.get(key));
+    }
+  }
+
+  public Map<String, String> getCustomProperties() {
+    return _customProperties;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org