You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by yu...@apache.org on 2021/05/20 17:27:21 UTC
[incubator-pinot] branch master updated: Add the complex-type
support to decoder/reader (#6945)
This is an automated email from the ASF dual-hosted git repository.
yupeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e400566 Add the complex-type support to decoder/reader (#6945)
e400566 is described below
commit e40056621a6b6ffd2201717d3da5dcccdecd7257
Author: Yupeng Fu <yu...@users.noreply.github.com>
AuthorDate: Thu May 20 10:26:27 2021 -0700
Add the complex-type support to decoder/reader (#6945)
* update decoder to support complex-type
* comments
* fix improts
---
.../pinot/segment/local/utils/IngestionUtils.java | 22 ++++++++++
...bEvents_offline_complexTypeHandling_schema.json | 50 +++++++++++++++-------
...s_offline_complexTypeHandling_table_config.json | 4 --
...eHandling_meetupRsvp_realtime_table_config.json | 12 +-----
.../complexTypeHandling_meetupRsvp_schema.json | 44 ++++++++++++-------
5 files changed, 87 insertions(+), 45 deletions(-)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
index d1a5b14..06bc61e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
@@ -32,6 +32,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.segment.local.function.FunctionEvaluator;
import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.name.FixedSegmentNameGenerator;
@@ -41,6 +42,7 @@ import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator;
import org.apache.pinot.spi.auth.AuthContext;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
@@ -298,10 +300,30 @@ public final class IngestionUtils {
Set<String> fieldsForRecordExtractor = new HashSet<>();
extractFieldsFromIngestionConfig(ingestionConfig, fieldsForRecordExtractor);
extractFieldsFromSchema(schema, fieldsForRecordExtractor);
+ fieldsForRecordExtractor = getFieldsToReadWithComplexType(fieldsForRecordExtractor, ingestionConfig);
return fieldsForRecordExtractor;
}
/**
+ * Extracts the root-level names from the fields, to support the complex-type handling. For example,
+ * a field a.b.c will return the top-level name a.
+ */
+ private static Set<String> getFieldsToReadWithComplexType(Set<String> fieldsToRead, IngestionConfig ingestionConfig) {
+ if (ingestionConfig == null || ingestionConfig.getComplexTypeConfig() == null) {
+ // do nothing
+ return fieldsToRead;
+ }
+ ComplexTypeConfig complexTypeConfig = ingestionConfig.getComplexTypeConfig();
+ Set<String> result = new HashSet<>();
+ String delimiter = complexTypeConfig.getDelimiter() == null ? ComplexTypeTransformer.DEFAULT_DELIMITER
+ : complexTypeConfig.getDelimiter();
+ for (String field : fieldsToRead) {
+ result.add(StringUtils.split(field, delimiter)[0]);
+ }
+ return result;
+ }
+
+ /**
* Extracts all the fields needed by the {@link org.apache.pinot.spi.data.readers.RecordExtractor} from the given Schema
* TODO: for now, we assume that arguments to transform function are in the source i.e. no columns are derived from transformed columns
*/
diff --git a/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_schema.json b/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_schema.json
index 30de6b5..dffd623 100644
--- a/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_schema.json
+++ b/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_schema.json
@@ -9,21 +9,41 @@
"dataType": "STRING"
},
{
- "name": "payload.commits.sha",
- "dataType": "STRING"
- },
- {
- "name": "payload.commits.author.name",
- "dataType": "STRING"
- },
- {
- "name": "payload.commits.author.email",
- "dataType": "STRING"
- },
- {
- "name": "payload_json",
- "dataType": "STRING",
- "maxLength": 2147483647
+ "name" : "payload.push_id",
+ "dataType" : "LONG"
+ }, {
+ "name" : "payload.size",
+ "dataType" : "INT"
+ }, {
+ "name" : "payload.distinct_size",
+ "dataType" : "INT"
+ }, {
+ "name" : "payload.ref",
+ "dataType" : "STRING"
+ }, {
+ "name" : "payload.head",
+ "dataType" : "STRING"
+ }, {
+ "name" : "payload.before",
+ "dataType" : "STRING"
+ }, {
+ "name" : "payload.commits.sha",
+ "dataType" : "STRING"
+ }, {
+ "name" : "payload.commits.author.name",
+ "dataType" : "STRING"
+ }, {
+ "name" : "payload.commits.author.email",
+ "dataType" : "STRING"
+ }, {
+ "name" : "payload.commits.message",
+ "dataType" : "STRING"
+ }, {
+ "name" : "payload.commits.distinct",
+ "dataType" : "BOOLEAN"
+ }, {
+ "name" : "payload.commits.url",
+ "dataType" : "STRING"
}
],
"dateTimeFieldSpecs": [
diff --git a/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_table_config.json b/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_table_config.json
index de496de..c72cf3c 100644
--- a/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_table_config.json
+++ b/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_table_config.json
@@ -16,10 +16,6 @@
{
"columnName": "created_at_timestamp",
"transformFunction": "fromDateTime(created_at, 'yyyy-MM-dd''T''HH:mm:ss''Z''')"
- },
- {
- "columnName": "payload_json",
- "transformFunction": "jsonFormat(\"payload\")"
}
],
"complexTypeConfig": {
diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json
index ce85aea..6b787a0 100644
--- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json
+++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json
@@ -26,23 +26,13 @@
]
},
"transformConfigs": [
- {
- "columnName": "group_json",
- "transformFunction": "jsonFormat(\"group\")"
- }
],
"complexTypeConfig": {
"unnestFields": ["group.group_topics"]
}
},
"tableIndexConfig": {
- "loadMode": "MMAP",
- "noDictionaryColumns": [
- "group_json"
- ],
- "jsonIndexColumns": [
- "group_json"
- ]
+ "loadMode": "MMAP"
},
"metadata": {
"customConfigs": {}
diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json
index 825e963..8974ccf 100644
--- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json
+++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json
@@ -2,21 +2,35 @@
"schemaName": "meetupRsvp",
"dimensionFieldSpecs": [
{
- "name": "group_json",
- "dataType": "STRING",
- "maxLength": 2147483647
- },
- {
- "name": "group.group_topics.urlkey",
- "dataType": "STRING"
- },
- {
- "name": "group.group_topics.topic_name",
- "dataType": "STRING"
- },
- {
- "name": "group.group_id",
- "dataType": "LONG"
+ "name" : "group.group_topics.urlkey",
+ "dataType" : "STRING"
+ }, {
+ "name" : "group.group_topics.topic_name",
+ "dataType" : "STRING"
+ }, {
+ "name" : "group.group_city",
+ "dataType" : "STRING"
+ }, {
+ "name" : "group.group_country",
+ "dataType" : "STRING"
+ }, {
+ "name" : "group.group_id",
+ "dataType" : "INT"
+ }, {
+ "name" : "group.group_name",
+ "dataType" : "STRING"
+ }, {
+ "name" : "group.group_lon",
+ "dataType" : "DOUBLE"
+ }, {
+ "name" : "group.group_urlname",
+ "dataType" : "STRING"
+ }, {
+ "name" : "group.group_state",
+ "dataType" : "STRING"
+ }, {
+ "name" : "group.group_lat",
+ "dataType" : "DOUBLE"
},
{
"name": "rsvp_id",
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org