You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by yu...@apache.org on 2021/05/20 17:27:21 UTC

[incubator-pinot] branch master updated: Add the complex-type support to decoder/reader (#6945)

This is an automated email from the ASF dual-hosted git repository.

yupeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e400566  Add the complex-type support to decoder/reader (#6945)
e400566 is described below

commit e40056621a6b6ffd2201717d3da5dcccdecd7257
Author: Yupeng Fu <yu...@users.noreply.github.com>
AuthorDate: Thu May 20 10:26:27 2021 -0700

    Add the complex-type support to decoder/reader (#6945)
    
    * update decoder to support complex-type
    
    * comments
    
    * fix improts
---
 .../pinot/segment/local/utils/IngestionUtils.java  | 22 ++++++++++
 ...bEvents_offline_complexTypeHandling_schema.json | 50 +++++++++++++++-------
 ...s_offline_complexTypeHandling_table_config.json |  4 --
 ...eHandling_meetupRsvp_realtime_table_config.json | 12 +-----
 .../complexTypeHandling_meetupRsvp_schema.json     | 44 ++++++++++++-------
 5 files changed, 87 insertions(+), 45 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
index d1a5b14..06bc61e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
@@ -32,6 +32,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.segment.local.function.FunctionEvaluator;
 import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.name.FixedSegmentNameGenerator;
@@ -41,6 +42,7 @@ import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator;
 import org.apache.pinot.spi.auth.AuthContext;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
 import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
@@ -298,10 +300,30 @@ public final class IngestionUtils {
     Set<String> fieldsForRecordExtractor = new HashSet<>();
     extractFieldsFromIngestionConfig(ingestionConfig, fieldsForRecordExtractor);
     extractFieldsFromSchema(schema, fieldsForRecordExtractor);
+    fieldsForRecordExtractor = getFieldsToReadWithComplexType(fieldsForRecordExtractor, ingestionConfig);
     return fieldsForRecordExtractor;
   }
 
   /**
+   * Extracts the root-level names from the fields, to support the complex-type handling. For example,
+   * a field a.b.c will return the top-level name a.
+   */
+  private static Set<String> getFieldsToReadWithComplexType(Set<String> fieldsToRead, IngestionConfig ingestionConfig) {
+    if (ingestionConfig == null || ingestionConfig.getComplexTypeConfig() == null) {
+      // do nothing
+      return fieldsToRead;
+    }
+    ComplexTypeConfig complexTypeConfig = ingestionConfig.getComplexTypeConfig();
+    Set<String> result = new HashSet<>();
+    String delimiter = complexTypeConfig.getDelimiter() == null ? ComplexTypeTransformer.DEFAULT_DELIMITER
+        : complexTypeConfig.getDelimiter();
+    for (String field : fieldsToRead) {
+      result.add(StringUtils.split(field, delimiter)[0]);
+    }
+    return result;
+  }
+
+  /**
    * Extracts all the fields needed by the {@link org.apache.pinot.spi.data.readers.RecordExtractor} from the given Schema
    * TODO: for now, we assume that arguments to transform function are in the source i.e. no columns are derived from transformed columns
    */
diff --git a/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_schema.json b/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_schema.json
index 30de6b5..dffd623 100644
--- a/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_schema.json
+++ b/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_schema.json
@@ -9,21 +9,41 @@
       "dataType": "STRING"
     },
     {
-      "name": "payload.commits.sha",
-      "dataType": "STRING"
-    },
-    {
-      "name": "payload.commits.author.name",
-      "dataType": "STRING"
-    },
-    {
-      "name": "payload.commits.author.email",
-      "dataType": "STRING"
-    },
-    {
-      "name": "payload_json",
-      "dataType": "STRING",
-      "maxLength": 2147483647
+      "name" : "payload.push_id",
+      "dataType" : "LONG"
+    }, {
+      "name" : "payload.size",
+      "dataType" : "INT"
+    }, {
+      "name" : "payload.distinct_size",
+      "dataType" : "INT"
+    }, {
+      "name" : "payload.ref",
+      "dataType" : "STRING"
+    }, {
+      "name" : "payload.head",
+      "dataType" : "STRING"
+    }, {
+      "name" : "payload.before",
+      "dataType" : "STRING"
+    }, {
+      "name" : "payload.commits.sha",
+      "dataType" : "STRING"
+    }, {
+      "name" : "payload.commits.author.name",
+      "dataType" : "STRING"
+    }, {
+      "name" : "payload.commits.author.email",
+      "dataType" : "STRING"
+    }, {
+      "name" : "payload.commits.message",
+      "dataType" : "STRING"
+    }, {
+      "name" : "payload.commits.distinct",
+      "dataType" : "BOOLEAN"
+    }, {
+      "name" : "payload.commits.url",
+      "dataType" : "STRING"
     }
   ],
   "dateTimeFieldSpecs": [
diff --git a/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_table_config.json b/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_table_config.json
index de496de..c72cf3c 100644
--- a/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_table_config.json
+++ b/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_table_config.json
@@ -16,10 +16,6 @@
       {
         "columnName": "created_at_timestamp",
         "transformFunction": "fromDateTime(created_at, 'yyyy-MM-dd''T''HH:mm:ss''Z''')"
-      },
-      {
-        "columnName": "payload_json",
-        "transformFunction": "jsonFormat(\"payload\")"
       }
     ],
     "complexTypeConfig": {
diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json
index ce85aea..6b787a0 100644
--- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json
+++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json
@@ -26,23 +26,13 @@
       ]
     },
     "transformConfigs": [
-      {
-        "columnName": "group_json",
-        "transformFunction": "jsonFormat(\"group\")"
-      }
     ],
     "complexTypeConfig": {
       "unnestFields": ["group.group_topics"]
     }
   },
   "tableIndexConfig": {
-    "loadMode": "MMAP",
-    "noDictionaryColumns": [
-      "group_json"
-    ],
-    "jsonIndexColumns": [
-      "group_json"
-    ]
+    "loadMode": "MMAP"
   },
   "metadata": {
     "customConfigs": {}
diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json
index 825e963..8974ccf 100644
--- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json
+++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json
@@ -2,21 +2,35 @@
   "schemaName": "meetupRsvp",
   "dimensionFieldSpecs": [
     {
-      "name": "group_json",
-      "dataType": "STRING",
-      "maxLength": 2147483647
-    },
-    {
-      "name": "group.group_topics.urlkey",
-      "dataType": "STRING"
-    },
-    {
-      "name": "group.group_topics.topic_name",
-      "dataType": "STRING"
-    },
-    {
-      "name": "group.group_id",
-      "dataType": "LONG"
+      "name" : "group.group_topics.urlkey",
+      "dataType" : "STRING"
+    }, {
+      "name" : "group.group_topics.topic_name",
+      "dataType" : "STRING"
+    }, {
+      "name" : "group.group_city",
+      "dataType" : "STRING"
+    }, {
+      "name" : "group.group_country",
+      "dataType" : "STRING"
+    }, {
+      "name" : "group.group_id",
+      "dataType" : "INT"
+    }, {
+      "name" : "group.group_name",
+      "dataType" : "STRING"
+    }, {
+      "name" : "group.group_lon",
+      "dataType" : "DOUBLE"
+    }, {
+      "name" : "group.group_urlname",
+      "dataType" : "STRING"
+    }, {
+      "name" : "group.group_state",
+      "dataType" : "STRING"
+    }, {
+      "name" : "group.group_lat",
+      "dataType" : "DOUBLE"
     },
     {
       "name": "rsvp_id",

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org