You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/11/17 03:25:48 UTC

[incubator-pinot] 01/01: Adding config utils to apply environment variables and apply it to table config

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch pinot_conf_with_env_var
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 4b44e3d853fa9aa4f1a65cbf9cd185bcf8c431d3
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Mon Nov 16 19:25:17 2020 -0800

    Adding config utils to apply environment variables and apply it to table config
---
 .../pinot/common/metadata/ZKMetadataProvider.java  |   5 +-
 .../org/apache/pinot/spi/config/ConfigUtils.java   |  72 +++++++++++
 .../apache/pinot/spi/config/ConfigUtilsTest.java   | 138 +++++++++++++++++++++
 3 files changed, 214 insertions(+), 1 deletion(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 9fa56fa..7cce1eb 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -36,6 +36,8 @@ import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.SegmentName;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+import org.apache.pinot.spi.config.ConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
@@ -219,7 +221,8 @@ public class ZKMetadataProvider {
       return null;
     }
     try {
-      return TableConfigUtils.fromZNRecord(znRecord);
+      TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
+      return (TableConfig) ConfigUtils.applyConfigWithEnvVariables(tableConfig);
     } catch (Exception e) {
       LOGGER.error("Caught exception while getting table configuration for table: {}", tableNameWithType, e);
       return null;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/ConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/ConfigUtils.java
new file mode 100644
index 0000000..29a9516
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/ConfigUtils.java
@@ -0,0 +1,72 @@
+package org.apache.pinot.spi.config;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.JsonNodeType;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+public class ConfigUtils {
+  private static final Map<String, String> ENVIRONMENT_VARIABLES = System.getenv();
+
+  /**
+   * Apply environment variables to any given BaseJsonConfig.
+   *
+   * @return Config with environment variable applied.
+   */
+  public static <T extends BaseJsonConfig> T applyConfigWithEnvVariables(T config) {
+    JsonNode jsonNode;
+    try {
+      jsonNode = applyConfigWithEnvVariables(config.toJsonNode());
+    } catch (RuntimeException e) {
+      throw new RuntimeException(String
+          .format("Unable to apply environment variables on json config class [%s].", config.getClass().getName()), e);
+    }
+    try {
+      return (T) JsonUtils.jsonNodeToObject(jsonNode, config.getClass());
+    } catch (IOException e) {
+      throw new RuntimeException(String
+          .format("Unable to read JsonConfig to class [%s] after applying environment variables, jsonConfig is: '%s'.",
+              config.getClass().getName(), jsonNode.toString()), e);
+    }
+  }
+
+  private static JsonNode applyConfigWithEnvVariables(JsonNode jsonNode) {
+    final JsonNodeType nodeType = jsonNode.getNodeType();
+    switch (nodeType) {
+      case OBJECT:
+        if (jsonNode.size() > 0) {
+          Iterator<Map.Entry<String, JsonNode>> iterator = jsonNode.fields();
+          while (iterator.hasNext()) {
+            final Map.Entry<String, JsonNode> next = iterator.next();
+            next.setValue(applyConfigWithEnvVariables(next.getValue()));
+          }
+        }
+        break;
+      case ARRAY:
+        if (jsonNode.isArray()) {
+          ArrayNode arrayNode = (ArrayNode) jsonNode;
+          for (int i = 0; i < arrayNode.size(); i++) {
+            JsonNode arrayElement = arrayNode.get(i);
+            arrayNode.set(i, applyConfigWithEnvVariables(arrayElement));
+          }
+        }
+        break;
+      case STRING:
+        final String field = jsonNode.asText();
+        if (field.startsWith("${") && field.endsWith("}")) {
+          final String envVarKey = field.substring(2, field.length() - 1);
+          if (ENVIRONMENT_VARIABLES.containsKey(envVarKey)) {
+            return JsonNodeFactory.instance.textNode(ENVIRONMENT_VARIABLES.get(envVarKey));
+          }
+          throw new RuntimeException("Missing environment Variable: " + field);
+        }
+        break;
+    }
+    return jsonNode;
+  }
+}
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java
new file mode 100644
index 0000000..37edd8b
--- /dev/null
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java
@@ -0,0 +1,138 @@
+package org.apache.pinot.spi.config;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class ConfigUtilsTest {
+
+  @Test
+  public void testIndexing()
+      throws Exception {
+    IndexingConfig indexingConfig = new IndexingConfig();
+    indexingConfig.setLoadMode("${LOAD_MODE}");
+    indexingConfig.setAggregateMetrics(true);
+    List<String> invertedIndexColumns = Arrays.asList("a", "b", "c");
+    indexingConfig.setInvertedIndexColumns(invertedIndexColumns);
+    List<String> sortedColumn = Arrays.asList("d", "e", "f");
+    indexingConfig.setSortedColumn(sortedColumn);
+    List<String> onHeapDictionaryColumns = Arrays.asList("x", "y", "z");
+    indexingConfig.setOnHeapDictionaryColumns(onHeapDictionaryColumns);
+    List<String> bloomFilterColumns = Arrays.asList("a", "b");
+    indexingConfig.setBloomFilterColumns(bloomFilterColumns);
+    Map<String, String> noDictionaryConfig = new HashMap<>();
+    noDictionaryConfig.put("a", "SNAPPY");
+    noDictionaryConfig.put("b", "PASS_THROUGH");
+    indexingConfig.setNoDictionaryConfig(noDictionaryConfig);
+    List<String> varLengthDictionaryColumns = Arrays.asList("a", "x", "z");
+    indexingConfig.setVarLengthDictionaryColumns(varLengthDictionaryColumns);
+
+    String streamType = "fakeStream";
+    String topic = "fakeTopic";
+    String consumerType = "simple";
+    String tableName = "fakeTable_REALTIME";
+    String consumerFactoryClass = "org.apache.pinot.plugin.stream.kafka20.StreamConsumerFactory";
+    String decoderClass = "org.apache.pinot.plugin.inputformat.avro.KafkaAvroMessageDecoder";
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
+    streamConfigMap
+        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
+            topic);
+    streamConfigMap
+        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+            consumerType);
+    streamConfigMap.put(StreamConfigProperties
+            .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
+        consumerFactoryClass);
+    streamConfigMap
+        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+            decoderClass);
+    streamConfigMap
+        .put(StreamConfigProperties.constructStreamProperty(streamType, "aws.accessKey"), "${AWS_ACCESS_KEY}");
+    streamConfigMap
+        .put(StreamConfigProperties.constructStreamProperty(streamType, "aws.secretKey"), "${AWS_SECRET_KEY}");
+    indexingConfig.setStreamConfigs(streamConfigMap);
+
+    setEnv(ImmutableMap.of("LOAD_MODE", "MMAP", "AWS_ACCESS_KEY", "default_aws_access_key", "AWS_SECRET_KEY",
+        "default_aws_secret_key"));
+
+    indexingConfig = ConfigUtils.applyConfigWithEnvVariables(indexingConfig);
+    assertEquals(indexingConfig.getLoadMode(), "MMAP");
+    assertTrue(indexingConfig.isAggregateMetrics());
+    assertEquals(indexingConfig.getInvertedIndexColumns(), invertedIndexColumns);
+    assertEquals(indexingConfig.getSortedColumn(), sortedColumn);
+    assertEquals(indexingConfig.getOnHeapDictionaryColumns(), onHeapDictionaryColumns);
+    assertEquals(indexingConfig.getBloomFilterColumns(), bloomFilterColumns);
+    assertEquals(indexingConfig.getNoDictionaryConfig(), noDictionaryConfig);
+    assertEquals(indexingConfig.getVarLengthDictionaryColumns(), varLengthDictionaryColumns);
+
+    // Mandatory values + defaults
+    StreamConfig streamConfig = new StreamConfig(tableName, indexingConfig.getStreamConfigs());
+    Assert.assertEquals(streamConfig.getType(), streamType);
+    Assert.assertEquals(streamConfig.getTopicName(), topic);
+    Assert.assertEquals(streamConfig.getConsumerTypes().get(0), StreamConfig.ConsumerType.LOWLEVEL);
+    Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), consumerFactoryClass);
+    Assert.assertEquals(streamConfig.getDecoderClass(), decoderClass);
+    Assert.assertEquals(streamConfig.getStreamConfigsMap().get("stream.fakeStream.aws.accessKey"),
+        "default_aws_access_key");
+    Assert.assertEquals(streamConfig.getStreamConfigsMap().get("stream.fakeStream.aws.secretKey"),
+        "default_aws_secret_key");
+    Assert.assertEquals(streamConfig.getDecoderProperties().size(), 0);
+    Assert
+        .assertEquals(streamConfig.getOffsetCriteria(), new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest());
+    Assert
+        .assertEquals(streamConfig.getConnectionTimeoutMillis(), StreamConfig.DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS);
+    Assert.assertEquals(streamConfig.getFetchTimeoutMillis(), StreamConfig.DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS);
+    Assert.assertEquals(streamConfig.getFlushThresholdRows(), StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
+    Assert.assertEquals(streamConfig.getFlushThresholdTimeMillis(), StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS);
+    Assert.assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(),
+        StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES);
+  }
+
+  private static void setEnv(Map<String, String> newEnvVariablsMap)
+      throws Exception {
+    try {
+      Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
+      Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment");
+      theEnvironmentField.setAccessible(true);
+      Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null);
+      env.putAll(newEnvVariablsMap);
+      Field theCaseInsensitiveEnvironmentField =
+          processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
+      theCaseInsensitiveEnvironmentField.setAccessible(true);
+      Map<String, String> cienv = (Map<String, String>) theCaseInsensitiveEnvironmentField.get(null);
+      cienv.putAll(newEnvVariablsMap);
+    } catch (NoSuchFieldException e) {
+      Class[] classes = Collections.class.getDeclaredClasses();
+      Map<String, String> env = System.getenv();
+      for (Class cl : classes) {
+        if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
+          Field field = cl.getDeclaredField("m");
+          field.setAccessible(true);
+          Object obj = field.get(env);
+          Map<String, String> map = (Map<String, String>) obj;
+          map.clear();
+          map.putAll(newEnvVariablsMap);
+        }
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org