You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/05/18 00:05:21 UTC

[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #6930: add command to infer pinot schema from json data

Jackie-Jiang commented on a change in pull request #6930:
URL: https://github.com/apache/incubator-pinot/pull/6930#discussion_r633937767



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
##########
@@ -380,4 +389,120 @@ private static void unnestResults(List<Map<String, String>> currentResults,
       unnestResults(newCurrentResults, nestedResultsList, index + 1, nonNestedResult, outputResults);
     }
   }
+
+  public static Schema getPinotSchemaFromJsonFile(File jsonFile,

Review comment:
       I feel this method is not really useful. It requires the whole json file to be a single object. In regular cases, the file should contain a list of records

##########
File path: pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
##########
@@ -93,48 +94,7 @@
 
   //@formatter:off
   @Argument(handler = SubCommandHandler.class, metaVar = "<subCommand>")
-  @SubCommands({
-      @SubCommand(name = "QuickStart", impl = QuickStartCommand.class),
-      @SubCommand(name = "OperateClusterConfig", impl = OperateClusterConfigCommand.class),
-      @SubCommand(name = "GenerateData", impl = GenerateDataCommand.class),
-      @SubCommand(name = "LaunchDataIngestionJob", impl = LaunchDataIngestionJobCommand.class),
-      @SubCommand(name = "CreateSegment", impl = CreateSegmentCommand.class),
-      @SubCommand(name = "ImportData", impl = ImportDataCommand.class),
-      @SubCommand(name = "StartZookeeper", impl = StartZookeeperCommand.class),
-      @SubCommand(name = "StartKafka", impl = StartKafkaCommand.class),
-      @SubCommand(name = "StreamAvroIntoKafka", impl = StreamAvroIntoKafkaCommand.class),
-      @SubCommand(name = "StartController", impl = StartControllerCommand.class),
-      @SubCommand(name = "StartBroker", impl = StartBrokerCommand.class),
-      @SubCommand(name = "StartServer", impl = StartServerCommand.class),
-      @SubCommand(name = "StartMinion", impl = StartMinionCommand.class),
-      @SubCommand(name = "StartServiceManager", impl = StartServiceManagerCommand.class),
-      @SubCommand(name = "AddTable", impl = AddTableCommand.class),
-      @SubCommand(name = "ChangeTableState", impl = ChangeTableState.class),
-      @SubCommand(name = "AddTenant", impl = AddTenantCommand.class),
-      @SubCommand(name = "AddSchema", impl = AddSchemaCommand.class),
-      @SubCommand(name = "UpdateSchema", impl = AddSchemaCommand.class),
-      @SubCommand(name = "UploadSegment", impl = UploadSegmentCommand.class),
-      @SubCommand(name = "PostQuery", impl = PostQueryCommand.class),
-      @SubCommand(name = "StopProcess", impl = StopProcessCommand.class),
-      @SubCommand(name = "DeleteCluster", impl = DeleteClusterCommand.class),
-      @SubCommand(name = "ShowClusterInfo", impl = ShowClusterInfoCommand.class),
-      @SubCommand(name = "AvroSchemaToPinotSchema", impl = AvroSchemaToPinotSchema.class),
-      @SubCommand(name = "RebalanceTable", impl = RebalanceTableCommand.class),
-      @SubCommand(name = "ChangeNumReplicas", impl = ChangeNumReplicasCommand.class),
-      @SubCommand(name = "ValidateConfig", impl = ValidateConfigCommand.class),
-      @SubCommand(name = "VerifySegmentState", impl = VerifySegmentState.class),
-      @SubCommand(name = "ConvertPinotSegment", impl = PinotSegmentConvertCommand.class),
-      @SubCommand(name = "MoveReplicaGroup", impl = MoveReplicaGroup.class),
-      @SubCommand(name = "VerifyClusterState", impl = VerifyClusterStateCommand.class),
-      @SubCommand(name = "RealtimeProvisioningHelper", impl = RealtimeProvisioningHelperCommand.class),
-      @SubCommand(name = "MergeSegments", impl = SegmentMergeCommand.class),
-      @SubCommand(name = "CheckOfflineSegmentIntervals", impl = OfflineSegmentIntervalCheckerCommand.class),
-      @SubCommand(name = "AnonymizeData", impl = AnonymizeDataCommand.class),
-      @SubCommand(name = "GitHubEventsQuickStart", impl = GitHubEventsQuickStartCommand.class),
-      @SubCommand(name = "StreamGitHubEvents", impl = StreamGitHubEventsCommand.class),
-      @SubCommand(name = "BootstrapTable", impl = BootstrapTableCommand.class),
-      @SubCommand(name = "SegmentProcessorFramework", impl = SegmentProcessorFrameworkCommand.class)
-  })
+  @SubCommands({@SubCommand(name = "QuickStart", impl = QuickStartCommand.class), @SubCommand(name = "OperateClusterConfig", impl = OperateClusterConfigCommand.class), @SubCommand(name = "GenerateData", impl = GenerateDataCommand.class), @SubCommand(name = "LaunchDataIngestionJob", impl = LaunchDataIngestionJobCommand.class), @SubCommand(name = "CreateSegment", impl = CreateSegmentCommand.class), @SubCommand(name = "ImportData", impl = ImportDataCommand.class), @SubCommand(name = "StartZookeeper", impl = StartZookeeperCommand.class), @SubCommand(name = "StartKafka", impl = StartKafkaCommand.class), @SubCommand(name = "StreamAvroIntoKafka", impl = StreamAvroIntoKafkaCommand.class), @SubCommand(name = "StartController", impl = StartControllerCommand.class), @SubCommand(name = "StartBroker", impl = StartBrokerCommand.class), @SubCommand(name = "StartServer", impl = StartServerCommand.class), @SubCommand(name = "StartMinion", impl = StartMinionCommand.class), @SubCommand(name = "StartSe
 rviceManager", impl = StartServiceManagerCommand.class), @SubCommand(name = "AddTable", impl = AddTableCommand.class), @SubCommand(name = "ChangeTableState", impl = ChangeTableState.class), @SubCommand(name = "AddTenant", impl = AddTenantCommand.class), @SubCommand(name = "AddSchema", impl = AddSchemaCommand.class), @SubCommand(name = "UpdateSchema", impl = AddSchemaCommand.class), @SubCommand(name = "UploadSegment", impl = UploadSegmentCommand.class), @SubCommand(name = "PostQuery", impl = PostQueryCommand.class), @SubCommand(name = "StopProcess", impl = StopProcessCommand.class), @SubCommand(name = "DeleteCluster", impl = DeleteClusterCommand.class), @SubCommand(name = "ShowClusterInfo", impl = ShowClusterInfoCommand.class), @SubCommand(name = "AvroSchemaToPinotSchema", impl = AvroSchemaToPinotSchema.class), @SubCommand(name = "JsonToPinotSchema", impl = JsonToPinotSchema.class), @SubCommand(name = "RebalanceTable", impl = RebalanceTableCommand.class), @SubCommand(name = "ChangeNu
 mReplicas", impl = ChangeNumReplicasCommand.class), @SubCommand(name = "ValidateConfig", impl = ValidateConfigCommand.class), @SubCommand(name = "VerifySegmentState", impl = VerifySegmentState.class), @SubCommand(name = "ConvertPinotSegment", impl = PinotSegmentConvertCommand.class), @SubCommand(name = "MoveReplicaGroup", impl = MoveReplicaGroup.class), @SubCommand(name = "VerifyClusterState", impl = VerifyClusterStateCommand.class), @SubCommand(name = "RealtimeProvisioningHelper", impl = RealtimeProvisioningHelperCommand.class), @SubCommand(name = "MergeSegments", impl = SegmentMergeCommand.class), @SubCommand(name = "CheckOfflineSegmentIntervals", impl = OfflineSegmentIntervalCheckerCommand.class), @SubCommand(name = "AnonymizeData", impl = AnonymizeDataCommand.class), @SubCommand(name = "GitHubEventsQuickStart", impl = GitHubEventsQuickStartCommand.class), @SubCommand(name = "StreamGitHubEvents", impl = StreamGitHubEventsCommand.class), @SubCommand(name = "BootstrapTable", impl =
  BootstrapTableCommand.class), @SubCommand(name = "SegmentProcessorFramework", impl = SegmentProcessorFrameworkCommand.class)})

Review comment:
       Revert (the formatter should already be turned off by the annotation)

##########
File path: pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/JsonToPinotSchema.java
##########
@@ -0,0 +1,143 @@
+package org.apache.pinot.tools.admin.command;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.tools.Command;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class for command to infer pinot schema from Json data. Given that it is not always possible to
+ * automatically do this, the intention is to get most of the work done by this class, and require any
+ * manual editing on top.
+ */
+public class JsonToPinotSchema extends AbstractBaseAdminCommand implements Command {
+  private static final Logger LOGGER = LoggerFactory.getLogger(JsonToPinotSchema.class);
+
+  @Option(name = "-jsonFile", required = true, metaVar = "<String>", usage = "Path to json file.")
+  String _jsonFile;
+
+  @Option(name = "-outputDir", required = true, metaVar = "<string>", usage = "Path to output directory")
+  String _outputDir;
+
+  @Option(name = "-pinotSchemaName", required = true, metaVar = "<string>", usage = "Pinot schema name")
+  String _pinotSchemaName;
+
+  @Option(name = "-dimensions", metaVar = "<string>", usage = "Comma separated dimension column names.")
+  String _dimensions;
+
+  @Option(name = "-metrics", metaVar = "<string>", usage = "Comma separated metric column names.")
+  String _metrics;
+
+  @Option(name = "-timeColumnName", metaVar = "<string>", usage = "Name of the time column.")

Review comment:
       We already deprecated the TIME field, change it to `-dateTimeColumns`?

##########
File path: pinot-spi/src/test/resources/json_util_test.json
##########
@@ -0,0 +1,25 @@
+{
+  "entries": [
+    {
+      "id": 1234,
+      "description": "entry1"
+    },
+    {
+      "id": 1235,
+      "description": "entry2"
+    }
+  ],
+  "tuple": {
+    "address": {
+      "streetaddress": "1st Ave",
+      "city": "Palo Alto"
+    }
+  },
+  "d2": [
+    1,
+    2
+  ],
+  "d1": "dim1",
+  "hoursSinceEpoch": 1621286582,
+  "m1": 12
+}

Review comment:
       New line

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
##########
@@ -380,4 +389,120 @@ private static void unnestResults(List<Map<String, String>> currentResults,
       unnestResults(newCurrentResults, nestedResultsList, index + 1, nonNestedResult, outputResults);
     }
   }
+
+  public static Schema getPinotSchemaFromJsonFile(File jsonFile,
+      @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit, List<String> unnestFields,

Review comment:
       unnestFields should also be nullable

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
##########
@@ -380,4 +389,120 @@ private static void unnestResults(List<Map<String, String>> currentResults,
       unnestResults(newCurrentResults, nestedResultsList, index + 1, nonNestedResult, outputResults);
     }
   }
+
+  public static Schema getPinotSchemaFromJsonFile(File jsonFile,
+      @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit, List<String> unnestFields,
+      String delimiter)
+      throws IOException {
+    JsonNode jsonNode = fileToJsonNode(jsonFile);
+    Preconditions.checkState(jsonNode.isObject(), "the JSON data shall be an object");
+    return getPinotSchemaFromJsonNode(jsonNode, fieldTypeMap, timeUnit, unnestFields, delimiter);
+  }
+
+  public static Schema getPinotSchemaFromJsonNode(JsonNode jsonNode,
+      @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit, List<String> unnestFields,
+      String delimiter) {
+    Schema pinotSchema = new Schema();
+    Iterator<Map.Entry<String, JsonNode>> fieldIterator = jsonNode.fields();
+    while (fieldIterator.hasNext()) {
+      Map.Entry<String, JsonNode> fieldEntry = fieldIterator.next();
+      JsonNode childNode = fieldEntry.getValue();
+      inferPinotSchemaFromJsonNode(childNode, pinotSchema, fieldEntry.getKey(), fieldTypeMap, timeUnit, unnestFields,
+          delimiter);
+    }
+    return pinotSchema;
+  }
+
+  private static void inferPinotSchemaFromJsonNode(JsonNode jsonNode, Schema pinotSchema, String path,
+      @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit, List<String> unnestFields,
+      String delimiter) {
+    if (jsonNode.isNull()) {
+      // do nothing
+      return;

Review comment:
       Should we throw exception when schema cannot be inferred?

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
##########
@@ -380,4 +389,120 @@ private static void unnestResults(List<Map<String, String>> currentResults,
       unnestResults(newCurrentResults, nestedResultsList, index + 1, nonNestedResult, outputResults);
     }
   }
+
+  public static Schema getPinotSchemaFromJsonFile(File jsonFile,
+      @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit, List<String> unnestFields,
+      String delimiter)
+      throws IOException {
+    JsonNode jsonNode = fileToJsonNode(jsonFile);
+    Preconditions.checkState(jsonNode.isObject(), "the JSON data shall be an object");
+    return getPinotSchemaFromJsonNode(jsonNode, fieldTypeMap, timeUnit, unnestFields, delimiter);
+  }
+
+  public static Schema getPinotSchemaFromJsonNode(JsonNode jsonNode,
+      @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit, List<String> unnestFields,
+      String delimiter) {
+    Schema pinotSchema = new Schema();
+    Iterator<Map.Entry<String, JsonNode>> fieldIterator = jsonNode.fields();
+    while (fieldIterator.hasNext()) {
+      Map.Entry<String, JsonNode> fieldEntry = fieldIterator.next();
+      JsonNode childNode = fieldEntry.getValue();
+      inferPinotSchemaFromJsonNode(childNode, pinotSchema, fieldEntry.getKey(), fieldTypeMap, timeUnit, unnestFields,
+          delimiter);
+    }
+    return pinotSchema;
+  }
+
+  private static void inferPinotSchemaFromJsonNode(JsonNode jsonNode, Schema pinotSchema, String path,
+      @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit, List<String> unnestFields,
+      String delimiter) {
+    if (jsonNode.isNull()) {
+      // do nothing
+      return;
+    } else if (jsonNode.isValueNode()) {
+      DataType dataType = valueOf(jsonNode);
+      addFieldToPinotSchema(pinotSchema, dataType, path, true, fieldTypeMap, timeUnit);
+    } else if (jsonNode.isArray()) {
+      int numChildren = jsonNode.size();
+      if (numChildren == 0) {
+        // do nothing
+        return;
+      }
+      JsonNode childNode = jsonNode.get(0);
+
+      if (unnestFields.contains(path)) {
+        inferPinotSchemaFromJsonNode(childNode, pinotSchema, path, fieldTypeMap, timeUnit, unnestFields, delimiter);
+      } else if (childNode.isValueNode()) {
+        addFieldToPinotSchema(pinotSchema, valueOf(childNode), path, false, fieldTypeMap, timeUnit);
+      } else {
+        addFieldToPinotSchema(pinotSchema, DataType.STRING, path, true, fieldTypeMap, timeUnit);
+      }
+    } else if (jsonNode.isObject()) {
+      Iterator<Map.Entry<String, JsonNode>> fieldIterator = jsonNode.fields();
+      while (fieldIterator.hasNext()) {
+        Map.Entry<String, JsonNode> fieldEntry = fieldIterator.next();
+        JsonNode childNode = fieldEntry.getValue();
+        inferPinotSchemaFromJsonNode(childNode, pinotSchema, String.join(delimiter, path, fieldEntry.getKey()),
+            fieldTypeMap, timeUnit, unnestFields, delimiter);
+      }
+    } else {
+      throw new IllegalArgumentException(String.format("Unsupported json node type", jsonNode.getClass()));
+    }
+  }
+
+  /**
+   * Returns the data type stored in Pinot that is associated with the given Avro type.
+   */
+  public static DataType valueOf(JsonNode jsonNode) {
+    if (jsonNode.isInt()) {
+      return DataType.INT;
+    } else if (jsonNode.isLong()) {
+      return DataType.LONG;
+    } else if (jsonNode.isFloat()) {
+      return DataType.FLOAT;
+    } else if (jsonNode.isDouble()) {
+      return DataType.DOUBLE;
+    } else if (jsonNode.isBoolean()) {
+      return DataType.BOOLEAN;
+    } else if (jsonNode.isBinary()) {
+      return DataType.BYTES;
+    } else {
+      return DataType.STRING;
+    }
+  }
+
+  private static void addFieldToPinotSchema(Schema pinotSchema, DataType dataType, String name,
+      boolean isSingleValueField, @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap,
+      @Nullable TimeUnit timeUnit) {
+    if (fieldTypeMap == null) {
+      pinotSchema.addField(new DimensionFieldSpec(name, dataType, isSingleValueField));
+    } else {
+      FieldSpec.FieldType fieldType =
+          fieldTypeMap.containsKey(name) ? fieldTypeMap.get(name) : FieldSpec.FieldType.DIMENSION;
+      Preconditions.checkNotNull(fieldType, "Field type not specified for field: %s", name);
+      switch (fieldType) {
+        case DIMENSION:
+          pinotSchema.addField(new DimensionFieldSpec(name, dataType, isSingleValueField));
+          break;
+        case METRIC:
+          Preconditions.checkState(isSingleValueField, "Metric field: %s cannot be multi-valued", name);
+          pinotSchema.addField(new MetricFieldSpec(name, dataType));
+          break;
+        case TIME:

Review comment:
       We already deprecated TIME field. Let's only support `DATE_TIME` here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org