You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/05/17 06:25:06 UTC

[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #6928: add complex-type support to avro-to-pinot schema inference

Jackie-Jiang commented on a change in pull request #6928:
URL: https://github.com/apache/incubator-pinot/pull/6928#discussion_r633250076



##########
File path: pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
##########
@@ -277,4 +280,86 @@ public static DataType extractFieldDataType(Field field) {
       return fieldSchema;
     }
   }
+
+  private static void extractSchemaWithComplexTypeHandling(org.apache.avro.Schema fieldSchema,
+      List<String> unnestFields, String delimiter, String path, Schema pinotSchema,
+      @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit) {
+    org.apache.avro.Schema.Type fieldType = fieldSchema.getType();
+    if (fieldType == org.apache.avro.Schema.Type.UNION) {
+      org.apache.avro.Schema nonNullSchema = null;
+      for (org.apache.avro.Schema childFieldSchema : fieldSchema.getTypes()) {
+        if (childFieldSchema.getType() != org.apache.avro.Schema.Type.NULL) {
+          if (nonNullSchema == null) {
+            nonNullSchema = childFieldSchema;
+          } else {
+            throw new IllegalStateException("More than one non-null schema in UNION schema");
+          }
+        }
+      }
+      if (nonNullSchema != null) {
+        extractSchemaWithComplexTypeHandling(nonNullSchema, unnestFields, delimiter, path, pinotSchema, fieldTypeMap,
+            timeUnit);
+      } else {
+        throw new IllegalStateException("Cannot find non-null schema in UNION schema");
+      }
+    } else if (fieldType == org.apache.avro.Schema.Type.RECORD) {
+      for (Field innerField : fieldSchema.getFields()) {
+        extractSchemaWithComplexTypeHandling(innerField.schema(), unnestFields, delimiter,
+            concat(delimiter, path, innerField.name()), pinotSchema, fieldTypeMap, timeUnit);
+      }
+    } else if (fieldType == org.apache.avro.Schema.Type.ARRAY) {
+      org.apache.avro.Schema elementType = fieldSchema.getElementType();
+      if (unnestFields.contains(path)) {
+        extractSchemaWithComplexTypeHandling(elementType, unnestFields, delimiter, path, pinotSchema, fieldTypeMap,
+            timeUnit);
+      } else if (AvroSchemaUtil.isPrimitiveType(elementType.getType())) {
+        addFieldToPinotSchema(pinotSchema, AvroSchemaUtil.valueOf(elementType.getType()), path, false, fieldTypeMap,
+            timeUnit);
+      } else {
+        addFieldToPinotSchema(pinotSchema, DataType.STRING, path, true, fieldTypeMap, timeUnit);
+      }
+    } else {
+      DataType dataType = AvroSchemaUtil.valueOf(fieldType);
+      addFieldToPinotSchema(pinotSchema, dataType, path, true, fieldTypeMap, timeUnit);
+    }
+  }
+
+  private static void addFieldToPinotSchema(Schema pinotSchema, DataType dataType, String name,
+      boolean isSingleValueField, @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap,
+      @Nullable TimeUnit timeUnit) {
+    if (fieldTypeMap == null) {
+      pinotSchema.addField(new DimensionFieldSpec(name, dataType, isSingleValueField));
+    } else {
+      FieldSpec.FieldType fieldType =
+          fieldTypeMap.containsKey(name) ? fieldTypeMap.get(name) : FieldSpec.FieldType.DIMENSION;
+      Preconditions.checkNotNull(fieldType, "Field type not specified for field: %s", name);
+      switch (fieldType) {
+        case DIMENSION:
+          pinotSchema.addField(new DimensionFieldSpec(name, dataType, isSingleValueField));
+          break;
+        case METRIC:
+          Preconditions.checkState(isSingleValueField, "Metric field: %s cannot be multi-valued", name);
+          pinotSchema.addField(new MetricFieldSpec(name, dataType));
+          break;
+        case TIME:
+          Preconditions.checkState(isSingleValueField, "Time field: %s cannot be multi-valued", name);
+          Preconditions.checkNotNull(timeUnit, "Time unit cannot be null");
+          pinotSchema.addField(new TimeFieldSpec(new TimeGranularitySpec(dataType, timeUnit, name)));
+          break;
+        case DATE_TIME:
+          Preconditions.checkState(isSingleValueField, "Time field: %s cannot be multi-valued", name);
+          Preconditions.checkNotNull(timeUnit, "Time unit cannot be null");
+          pinotSchema.addField(new DateTimeFieldSpec(name, dataType,
+              new DateTimeFormatSpec(1, timeUnit.toString(), DateTimeFieldSpec.TimeFormat.EPOCH.toString()).getFormat(),
+              new DateTimeGranularitySpec(1, timeUnit).getGranularity()));
+          break;
+        default:
+          throw new UnsupportedOperationException("Unsupported field type: " + fieldType + " for field: " + name);
+      }
+    }
+  }
+
+  private static String concat(String delimiter, String path, String component) {

Review comment:
       Directly inline this method might be more readable?

##########
File path: pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
##########
@@ -277,4 +280,86 @@ public static DataType extractFieldDataType(Field field) {
       return fieldSchema;
     }
   }
+
+  private static void extractSchemaWithComplexTypeHandling(org.apache.avro.Schema fieldSchema,
+      List<String> unnestFields, String delimiter, String path, Schema pinotSchema,
+      @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit) {
+    org.apache.avro.Schema.Type fieldType = fieldSchema.getType();
+    if (fieldType == org.apache.avro.Schema.Type.UNION) {

Review comment:
       Consider using switch?

##########
File path: pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroSchemaUtil.java
##########
@@ -52,6 +52,25 @@ public static DataType valueOf(Schema.Type avroType) {
     }
   }
 
+  /**
+   * @return if the given avro type is a primitive type.
+   */
+  public static boolean isPrimitiveType(Schema.Type avroType) {
+    switch (avroType) {
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+      case BOOLEAN:
+      case STRING:
+      case ENUM:
+      case BYTES:

Review comment:
       Currently Pinot does not support MV BYTES, so not sure if we want to count it as primitive type




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org