You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by yu...@apache.org on 2021/05/20 17:27:37 UTC

[incubator-pinot] branch master updated: Add collectionToJsonMode to schema inference (#6946)

This is an automated email from the ASF dual-hosted git repository.

yupeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f4ae3e0  Add collectionToJsonMode to schema inference (#6946)
f4ae3e0 is described below

commit f4ae3e0a0a46f75bd284a852c936bcb27bab1aa1
Author: Yupeng Fu <yu...@users.noreply.github.com>
AuthorDate: Thu May 20 10:27:08 2021 -0700

    Add collectionToJsonMode to schema inference (#6946)
---
 .../pinot/plugin/inputformat/avro/AvroUtils.java   | 41 ++++++++++++++++------
 .../plugin/inputformat/avro/AvroUtilsTest.java     | 20 +++++++++--
 .../java/org/apache/pinot/spi/utils/JsonUtils.java | 39 ++++++++++++++------
 .../org/apache/pinot/spi/utils/JsonUtilsTest.java  | 37 +++++++++++++------
 .../admin/command/AvroSchemaToPinotSchema.java     | 17 +++++++--
 .../tools/admin/command/JsonToPinotSchema.java     | 16 +++++++--
 6 files changed, 133 insertions(+), 37 deletions(-)

diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
index bbd7a79..ae72c85 100644
--- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
+++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
@@ -32,6 +32,7 @@ import org.apache.avro.SchemaBuilder;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.DateTimeGranularitySpec;
@@ -81,17 +82,18 @@ public class AvroUtils {
    * @param timeUnit Time unit
    * @param unnestFields the fields to unnest
    * @param delimiter the delimiter to separate components in nested structure
+   * @param collectionToJsonMode the mode of converting collection to JSON
    *
    * @return Pinot schema
    */
   public static Schema getPinotSchemaFromAvroSchemaWithComplexTypeHandling(org.apache.avro.Schema avroSchema,
       @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit, List<String> unnestFields,
-      String delimiter) {
+      String delimiter, ComplexTypeConfig.CollectionToJsonMode collectionToJsonMode) {
     Schema pinotSchema = new Schema();
 
     for (Field field : avroSchema.getFields()) {
       extractSchemaWithComplexTypeHandling(field.schema(), unnestFields, delimiter, field.name(), pinotSchema,
-          fieldTypeMap, timeUnit);
+          fieldTypeMap, timeUnit, collectionToJsonMode);
     }
     return pinotSchema;
   }
@@ -134,18 +136,19 @@ public class AvroUtils {
    * @param complexType if allows complex-type handling
    * @param unnestFields the fields to unnest
    * @param delimiter the delimiter separating components in nested structure
+   * @param collectionToJsonMode to mode of converting collection to JSON string
    * @return Pinot schema
    */
   public static Schema getPinotSchemaFromAvroSchemaFile(File avroSchemaFile,
       @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit, boolean complexType,
-      List<String> unnestFields, String delimiter)
+      List<String> unnestFields, String delimiter, ComplexTypeConfig.CollectionToJsonMode collectionToJsonMode)
       throws IOException {
     org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(avroSchemaFile);
     if (!complexType) {
       return getPinotSchemaFromAvroSchema(avroSchema, fieldTypeMap, timeUnit);
     } else {
       return getPinotSchemaFromAvroSchemaWithComplexTypeHandling(avroSchema, fieldTypeMap, timeUnit, unnestFields,
-          delimiter);
+          delimiter, collectionToJsonMode);
     }
   }
 
@@ -283,7 +286,8 @@ public class AvroUtils {
 
   private static void extractSchemaWithComplexTypeHandling(org.apache.avro.Schema fieldSchema,
       List<String> unnestFields, String delimiter, String path, Schema pinotSchema,
-      @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit) {
+      @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit,
+      ComplexTypeConfig.CollectionToJsonMode collectionToJsonMode) {
     org.apache.avro.Schema.Type fieldType = fieldSchema.getType();
     switch (fieldType) {
       case UNION:
@@ -299,7 +303,7 @@ public class AvroUtils {
         }
         if (nonNullSchema != null) {
           extractSchemaWithComplexTypeHandling(nonNullSchema, unnestFields, delimiter, path, pinotSchema, fieldTypeMap,
-              timeUnit);
+              timeUnit, collectionToJsonMode);
         } else {
           throw new IllegalStateException("Cannot find non-null schema in UNION schema");
         }
@@ -307,20 +311,23 @@ public class AvroUtils {
       case RECORD:
         for (Field innerField : fieldSchema.getFields()) {
           extractSchemaWithComplexTypeHandling(innerField.schema(), unnestFields, delimiter,
-              String.join(delimiter, path, innerField.name()), pinotSchema, fieldTypeMap, timeUnit);
+              String.join(delimiter, path, innerField.name()), pinotSchema, fieldTypeMap, timeUnit,
+              collectionToJsonMode);
         }
         break;
       case ARRAY:
         org.apache.avro.Schema elementType = fieldSchema.getElementType();
         if (unnestFields.contains(path)) {
           extractSchemaWithComplexTypeHandling(elementType, unnestFields, delimiter, path, pinotSchema, fieldTypeMap,
-              timeUnit);
-        } else if (AvroSchemaUtil.isPrimitiveType(elementType.getType())) {
+              timeUnit, collectionToJsonMode);
+        } else if (collectionToJsonMode == ComplexTypeConfig.CollectionToJsonMode.NON_PRIMITIVE && AvroSchemaUtil
+            .isPrimitiveType(elementType.getType())) {
           addFieldToPinotSchema(pinotSchema, AvroSchemaUtil.valueOf(elementType.getType()), path, false, fieldTypeMap,
               timeUnit);
-        } else {
+        } else if (shallConvertToJson(collectionToJsonMode, elementType)) {
           addFieldToPinotSchema(pinotSchema, DataType.STRING, path, true, fieldTypeMap, timeUnit);
         }
+        // do not include the node for other cases
         break;
       default:
         DataType dataType = AvroSchemaUtil.valueOf(fieldType);
@@ -328,6 +335,20 @@ public class AvroUtils {
     }
   }
 
+  private static boolean shallConvertToJson(ComplexTypeConfig.CollectionToJsonMode collectionToJsonMode,
+      org.apache.avro.Schema elementType) {
+    switch (collectionToJsonMode) {
+      case ALL:
+        return true;
+      case NONE:
+        return false;
+      case NON_PRIMITIVE:
+        return !AvroSchemaUtil.isPrimitiveType(elementType.getType());
+      default:
+        throw new IllegalArgumentException(String.format("Unsupported collectionToJsonMode %s", collectionToJsonMode));
+    }
+  }
+
   private static void addFieldToPinotSchema(Schema pinotSchema, DataType dataType, String name,
       boolean isSingleValueField, @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap,
       @Nullable TimeUnit timeUnit) {
diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroUtilsTest.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroUtilsTest.java
index 1667511..e0140ea 100644
--- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroUtilsTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroUtilsTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.FieldSpec.FieldType;
@@ -89,7 +90,7 @@ public class AvroUtilsTest {
             .put("hoursSinceEpoch", FieldType.TIME).put("m1", FieldType.METRIC).build();
     Schema inferredPinotSchema = AvroUtils
         .getPinotSchemaFromAvroSchemaWithComplexTypeHandling(avroSchema, fieldSpecMap, TimeUnit.HOURS,
-            new ArrayList<>(), ".");
+            new ArrayList<>(), ".", ComplexTypeConfig.CollectionToJsonMode.NON_PRIMITIVE);
     Schema expectedSchema =
         new Schema.SchemaBuilder().addSingleValueDimension("d1", DataType.STRING).addMetric("m1", DataType.INT)
             .addSingleValueDimension("tuple.streetaddress", DataType.STRING)
@@ -101,7 +102,7 @@ public class AvroUtilsTest {
     // unnest collection entries
     inferredPinotSchema = AvroUtils
         .getPinotSchemaFromAvroSchemaWithComplexTypeHandling(avroSchema, fieldSpecMap, TimeUnit.HOURS,
-            Lists.newArrayList("entries"), ".");
+            Lists.newArrayList("entries"), ".", ComplexTypeConfig.CollectionToJsonMode.NON_PRIMITIVE);
     expectedSchema =
         new Schema.SchemaBuilder().addSingleValueDimension("d1", DataType.STRING).addMetric("m1", DataType.INT)
             .addSingleValueDimension("tuple.streetaddress", DataType.STRING)
@@ -113,7 +114,7 @@ public class AvroUtilsTest {
     // change delimiter
     inferredPinotSchema = AvroUtils
         .getPinotSchemaFromAvroSchemaWithComplexTypeHandling(avroSchema, fieldSpecMap, TimeUnit.HOURS,
-            Lists.newArrayList(), "_");
+            Lists.newArrayList(), "_", ComplexTypeConfig.CollectionToJsonMode.NON_PRIMITIVE);
     expectedSchema =
         new Schema.SchemaBuilder().addSingleValueDimension("d1", DataType.STRING).addMetric("m1", DataType.INT)
             .addSingleValueDimension("tuple_streetaddress", DataType.STRING)
@@ -121,5 +122,18 @@ public class AvroUtilsTest {
             .addMultiValueDimension("d2", DataType.INT)
             .addTime(new TimeGranularitySpec(DataType.LONG, TimeUnit.HOURS, "hoursSinceEpoch"), null).build();
     Assert.assertEquals(expectedSchema, inferredPinotSchema);
+
+    // change the handling of collection-to-json option, d2 will become string
+    inferredPinotSchema = AvroUtils
+        .getPinotSchemaFromAvroSchemaWithComplexTypeHandling(avroSchema, fieldSpecMap, TimeUnit.HOURS,
+            Lists.newArrayList("entries"), ".", ComplexTypeConfig.CollectionToJsonMode.ALL);
+    expectedSchema =
+        new Schema.SchemaBuilder().addSingleValueDimension("d1", DataType.STRING).addMetric("m1", DataType.INT)
+            .addSingleValueDimension("tuple.streetaddress", DataType.STRING)
+            .addSingleValueDimension("tuple.city", DataType.STRING).addSingleValueDimension("entries.id", DataType.LONG)
+            .addSingleValueDimension("entries.description", DataType.STRING)
+            .addSingleValueDimension("d2", DataType.STRING)
+            .addTime(new TimeGranularitySpec(DataType.LONG, TimeUnit.HOURS, "hoursSinceEpoch"), null).build();
+    Assert.assertEquals(expectedSchema, inferredPinotSchema);
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
index c427640..ecdd6a9 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
@@ -43,6 +43,7 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.DateTimeGranularitySpec;
@@ -409,33 +410,34 @@ public class JsonUtils {
 
   public static Schema getPinotSchemaFromJsonFile(File jsonFile,
       @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit,
-      @Nullable List<String> unnestFields, String delimiter)
+      @Nullable List<String> unnestFields, String delimiter,
+      ComplexTypeConfig.CollectionToJsonMode collectionToJsonMode)
       throws IOException {
     JsonNode jsonNode = fileToFirstJsonNode(jsonFile);
     if (unnestFields == null) {
       unnestFields = new ArrayList<>();
     }
     Preconditions.checkState(jsonNode.isObject(), "the JSON data shall be an object");
-    return getPinotSchemaFromJsonNode(jsonNode, fieldTypeMap, timeUnit, unnestFields, delimiter);
+    return getPinotSchemaFromJsonNode(jsonNode, fieldTypeMap, timeUnit, unnestFields, delimiter, collectionToJsonMode);
   }
 
   public static Schema getPinotSchemaFromJsonNode(JsonNode jsonNode,
       @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit, List<String> unnestFields,
-      String delimiter) {
+      String delimiter, ComplexTypeConfig.CollectionToJsonMode collectionToJsonMode) {
     Schema pinotSchema = new Schema();
     Iterator<Map.Entry<String, JsonNode>> fieldIterator = jsonNode.fields();
     while (fieldIterator.hasNext()) {
       Map.Entry<String, JsonNode> fieldEntry = fieldIterator.next();
       JsonNode childNode = fieldEntry.getValue();
       inferPinotSchemaFromJsonNode(childNode, pinotSchema, fieldEntry.getKey(), fieldTypeMap, timeUnit, unnestFields,
-          delimiter);
+          delimiter, collectionToJsonMode);
     }
     return pinotSchema;
   }
 
   private static void inferPinotSchemaFromJsonNode(JsonNode jsonNode, Schema pinotSchema, String path,
       @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit, List<String> unnestFields,
-      String delimiter) {
+      String delimiter, ComplexTypeConfig.CollectionToJsonMode collectionToJsonMode) {
     if (jsonNode.isNull()) {
       // do nothing
       return;
@@ -451,25 +453,42 @@ public class JsonUtils {
       JsonNode childNode = jsonNode.get(0);
 
       if (unnestFields.contains(path)) {
-        inferPinotSchemaFromJsonNode(childNode, pinotSchema, path, fieldTypeMap, timeUnit, unnestFields, delimiter);
-      } else if (childNode.isValueNode()) {
-        addFieldToPinotSchema(pinotSchema, valueOf(childNode), path, false, fieldTypeMap, timeUnit);
-      } else {
+        inferPinotSchemaFromJsonNode(childNode, pinotSchema, path, fieldTypeMap, timeUnit, unnestFields, delimiter,
+            collectionToJsonMode);
+      } else if (shallConvertToJson(collectionToJsonMode, childNode)) {
         addFieldToPinotSchema(pinotSchema, DataType.STRING, path, true, fieldTypeMap, timeUnit);
+      } else if (collectionToJsonMode == ComplexTypeConfig.CollectionToJsonMode.NON_PRIMITIVE && childNode
+          .isValueNode()) {
+        addFieldToPinotSchema(pinotSchema, valueOf(childNode), path, false, fieldTypeMap, timeUnit);
       }
+      // do not include the node for other cases
     } else if (jsonNode.isObject()) {
       Iterator<Map.Entry<String, JsonNode>> fieldIterator = jsonNode.fields();
       while (fieldIterator.hasNext()) {
         Map.Entry<String, JsonNode> fieldEntry = fieldIterator.next();
         JsonNode childNode = fieldEntry.getValue();
         inferPinotSchemaFromJsonNode(childNode, pinotSchema, String.join(delimiter, path, fieldEntry.getKey()),
-            fieldTypeMap, timeUnit, unnestFields, delimiter);
+            fieldTypeMap, timeUnit, unnestFields, delimiter, collectionToJsonMode);
       }
     } else {
       throw new IllegalArgumentException(String.format("Unsupported json node type", jsonNode.getClass()));
     }
   }
 
+  private static boolean shallConvertToJson(ComplexTypeConfig.CollectionToJsonMode collectionToJsonMode,
+      JsonNode childNode) {
+    switch (collectionToJsonMode) {
+      case ALL:
+        return true;
+      case NONE:
+        return false;
+      case NON_PRIMITIVE:
+        return !childNode.isValueNode();
+      default:
+        throw new IllegalArgumentException(String.format("Unsupported collectionToJsonMode %s", collectionToJsonMode));
+    }
+  }
+
   /**
    * Returns the data type stored in Pinot that is associated with the given Avro type.
    */
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java
index 4e47264..4c2167c 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.testng.Assert;
@@ -273,21 +274,22 @@ public class JsonUtilsTest {
     Map<String, FieldSpec.FieldType> fieldSpecMap =
         new ImmutableMap.Builder<String, FieldSpec.FieldType>().put("d1", FieldSpec.FieldType.DIMENSION)
             .put("hoursSinceEpoch", FieldSpec.FieldType.DATE_TIME).put("m1", FieldSpec.FieldType.METRIC).build();
-    Schema inferredPinotSchema =
-        JsonUtils.getPinotSchemaFromJsonFile(file, fieldSpecMap, TimeUnit.HOURS, new ArrayList<>(), ".");
+    Schema inferredPinotSchema = JsonUtils
+        .getPinotSchemaFromJsonFile(file, fieldSpecMap, TimeUnit.HOURS, new ArrayList<>(), ".",
+            ComplexTypeConfig.CollectionToJsonMode.NON_PRIMITIVE);
     Schema expectedSchema = new Schema.SchemaBuilder().addSingleValueDimension("d1", FieldSpec.DataType.STRING)
         .addMetric("m1", FieldSpec.DataType.INT)
         .addSingleValueDimension("tuple.address.streetaddress", FieldSpec.DataType.STRING)
         .addSingleValueDimension("tuple.address.city", FieldSpec.DataType.STRING)
         .addSingleValueDimension("entries", FieldSpec.DataType.STRING)
         .addMultiValueDimension("d2", FieldSpec.DataType.INT)
-        .addDateTime("hoursSinceEpoch",FieldSpec.DataType.INT, "1:HOURS:EPOCH","1:HOURS")
-        .build();
+        .addDateTime("hoursSinceEpoch", FieldSpec.DataType.INT, "1:HOURS:EPOCH", "1:HOURS").build();
     Assert.assertEquals(inferredPinotSchema, expectedSchema);
 
     // unnest collection entries
-    inferredPinotSchema =
-        JsonUtils.getPinotSchemaFromJsonFile(file, fieldSpecMap, TimeUnit.HOURS, Lists.newArrayList("entries"), ".");
+    inferredPinotSchema = JsonUtils
+        .getPinotSchemaFromJsonFile(file, fieldSpecMap, TimeUnit.HOURS, Lists.newArrayList("entries"), ".",
+            ComplexTypeConfig.CollectionToJsonMode.NON_PRIMITIVE);
     expectedSchema = new Schema.SchemaBuilder().addSingleValueDimension("d1", FieldSpec.DataType.STRING)
         .addMetric("m1", FieldSpec.DataType.INT)
         .addSingleValueDimension("tuple.address.streetaddress", FieldSpec.DataType.STRING)
@@ -295,19 +297,34 @@ public class JsonUtilsTest {
         .addSingleValueDimension("entries.id", FieldSpec.DataType.INT)
         .addSingleValueDimension("entries.description", FieldSpec.DataType.STRING)
         .addMultiValueDimension("d2", FieldSpec.DataType.INT)
-        .addDateTime("hoursSinceEpoch",FieldSpec.DataType.INT, "1:HOURS:EPOCH","1:HOURS").build();
+        .addDateTime("hoursSinceEpoch", FieldSpec.DataType.INT, "1:HOURS:EPOCH", "1:HOURS").build();
     Assert.assertEquals(inferredPinotSchema, expectedSchema);
 
     // change delimiter
-    inferredPinotSchema =
-        JsonUtils.getPinotSchemaFromJsonFile(file, fieldSpecMap, TimeUnit.HOURS, Lists.newArrayList(""), "_");
+    inferredPinotSchema = JsonUtils
+        .getPinotSchemaFromJsonFile(file, fieldSpecMap, TimeUnit.HOURS, Lists.newArrayList(""), "_",
+            ComplexTypeConfig.CollectionToJsonMode.NON_PRIMITIVE);
     expectedSchema = new Schema.SchemaBuilder().addSingleValueDimension("d1", FieldSpec.DataType.STRING)
         .addMetric("m1", FieldSpec.DataType.INT)
         .addSingleValueDimension("tuple_address_streetaddress", FieldSpec.DataType.STRING)
         .addSingleValueDimension("tuple_address_city", FieldSpec.DataType.STRING)
         .addSingleValueDimension("entries", FieldSpec.DataType.STRING)
         .addMultiValueDimension("d2", FieldSpec.DataType.INT)
-        .addDateTime("hoursSinceEpoch",FieldSpec.DataType.INT, "1:HOURS:EPOCH","1:HOURS").build();
+        .addDateTime("hoursSinceEpoch", FieldSpec.DataType.INT, "1:HOURS:EPOCH", "1:HOURS").build();
+    Assert.assertEquals(inferredPinotSchema, expectedSchema);
+
+    // change the handling of collection-to-json option, d2 will become string
+    inferredPinotSchema = JsonUtils
+        .getPinotSchemaFromJsonFile(file, fieldSpecMap, TimeUnit.HOURS, Lists.newArrayList("entries"), ".",
+            ComplexTypeConfig.CollectionToJsonMode.ALL);
+    expectedSchema = new Schema.SchemaBuilder().addSingleValueDimension("d1", FieldSpec.DataType.STRING)
+        .addMetric("m1", FieldSpec.DataType.INT)
+        .addSingleValueDimension("tuple.address.streetaddress", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("tuple.address.city", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("entries.id", FieldSpec.DataType.INT)
+        .addSingleValueDimension("entries.description", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("d2", FieldSpec.DataType.STRING)
+        .addDateTime("hoursSinceEpoch", FieldSpec.DataType.INT, "1:HOURS:EPOCH", "1:HOURS").build();
     Assert.assertEquals(inferredPinotSchema, expectedSchema);
   }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AvroSchemaToPinotSchema.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AvroSchemaToPinotSchema.java
index 423fb26..34ea580 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AvroSchemaToPinotSchema.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AvroSchemaToPinotSchema.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
 import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
+import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.tools.Command;
@@ -76,6 +77,9 @@ public class AvroSchemaToPinotSchema extends AbstractBaseAdminCommand implements
   @Option(name = "-complexType", metaVar = "<boolean>", usage = "allow complex-type handling, default to false")
   boolean _complexType;
 
+  @Option(name = "-collectionToJsonMode", metaVar = "<string>", usage = "The mode of converting collection to JSON string, can be NONE/NON_PRIMITIVE/ALL")
+  String _collectionToJsonMode;
+
   @SuppressWarnings("FieldCanBeLocal")
   @Option(name = "-help", help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
@@ -93,7 +97,7 @@ public class AvroSchemaToPinotSchema extends AbstractBaseAdminCommand implements
     if (_avroSchemaFile != null) {
       schema = AvroUtils
           .getPinotSchemaFromAvroSchemaFile(new File(_avroSchemaFile), buildFieldTypesMap(), _timeUnit, _complexType,
-              buildUnnestFields(), getDelimiter());
+              buildUnnestFields(), getDelimiter(), getCollectionToJsonMode());
     } else if (_avroDataFile != null) {
       schema = AvroUtils.getPinotSchemaFromAvroDataFile(new File(_avroDataFile), buildFieldTypesMap(), _timeUnit);
     } else {
@@ -132,7 +136,9 @@ public class AvroSchemaToPinotSchema extends AbstractBaseAdminCommand implements
   public String toString() {
     return "AvroSchemaToPinotSchema -avroSchemaFile " + _avroSchemaFile + " -avroDataFile " + _avroDataFile
         + " -outputDir " + _outputDir + " -pinotSchemaName " + _pinotSchemaName + " -dimensions " + _dimensions
-        + " -metrics " + _metrics + " -timeColumnName " + _timeColumnName + " -timeUnit " + _timeUnit;
+        + " -metrics " + _metrics + " -timeColumnName " + _timeColumnName + " -timeUnit " + _timeUnit
+        + " _unnestFields " + _unnestFields + " _delimiter " + _delimiter + " _complexType " + _complexType
+        + " _collectionToJsonMode " + _collectionToJsonMode;
   }
 
   /**
@@ -169,6 +175,13 @@ public class AvroSchemaToPinotSchema extends AbstractBaseAdminCommand implements
     return unnestFields;
   }
 
+  private ComplexTypeConfig.CollectionToJsonMode getCollectionToJsonMode() {
+    if (_collectionToJsonMode == null) {
+      return ComplexTypeTransformer.DEFAULT_COLLECTION_TO_JSON_MODE;
+    }
+    return ComplexTypeConfig.CollectionToJsonMode.valueOf(_collectionToJsonMode);
+  }
+
   private String getDelimiter() {
     return _delimiter == null ? ComplexTypeTransformer.DEFAULT_DELIMITER : _delimiter;
   }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/JsonToPinotSchema.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/JsonToPinotSchema.java
index eb29477..0a63bfc 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/JsonToPinotSchema.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/JsonToPinotSchema.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
+import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -70,6 +71,9 @@ public class JsonToPinotSchema extends AbstractBaseAdminCommand implements Comma
   @Option(name = "-delimiter", metaVar = "<string>", usage = "The delimiter separating components in nested structure, default to dot")
   String _delimiter;
 
+  @Option(name = "-collectionToJsonMode", metaVar = "<string>", usage = "The mode of converting collection to JSON string, can be NONE/NON_PRIMITIVE/ALL")
+  String _collectionToJsonMode;
+
   @SuppressWarnings("FieldCanBeLocal")
   @Option(name = "-help", help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
@@ -86,7 +90,7 @@ public class JsonToPinotSchema extends AbstractBaseAdminCommand implements Comma
     Schema schema;
     schema = JsonUtils
         .getPinotSchemaFromJsonFile(new File(_jsonFile), buildFieldTypesMap(), _timeUnit, buildUnnestFields(),
-            getDelimiter());
+            getDelimiter(), getCollectionToJsonMode());
     schema.setSchemaName(_pinotSchemaName);
 
     File outputDir = new File(_outputDir);
@@ -118,7 +122,8 @@ public class JsonToPinotSchema extends AbstractBaseAdminCommand implements Comma
   public String toString() {
     return "JsonToPinotSchema -jsonFile " + _jsonFile + " -outputDir " + _outputDir + " -pinotSchemaName "
         + _pinotSchemaName + " -dimensions " + _dimensions + " -metrics " + _metrics + " -timeColumnName "
-        + _dateTimeColumnName + " -timeUnit " + _timeUnit;
+        + _dateTimeColumnName + " -timeUnit " + _timeUnit + " _unnestFields " + _unnestFields + " _delimiter "
+        + _delimiter + " _collectionToJsonMode " + _collectionToJsonMode;
   }
 
   /**
@@ -155,6 +160,13 @@ public class JsonToPinotSchema extends AbstractBaseAdminCommand implements Comma
     return unnestFields;
   }
 
+  private ComplexTypeConfig.CollectionToJsonMode getCollectionToJsonMode() {
+    if (_collectionToJsonMode == null) {
+      return ComplexTypeTransformer.DEFAULT_COLLECTION_TO_JSON_MODE;
+    }
+    return ComplexTypeConfig.CollectionToJsonMode.valueOf(_collectionToJsonMode);
+  }
+
   private String getDelimiter() {
     return _delimiter == null ? ComplexTypeTransformer.DEFAULT_DELIMITER : _delimiter;
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org