You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/11/01 22:50:52 UTC
[pinot] branch master updated: Add a quick start with both UPSERT
and JSON index (#7669)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cc2d63c Add a quick start with both UPSERT and JSON index (#7669)
cc2d63c is described below
commit cc2d63c92b8586a988a7514f66cd24a139b35ebc
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Nov 1 15:50:36 2021 -0700
Add a quick start with both UPSERT and JSON index (#7669)
---
.../segment/local/utils/TableConfigUtils.java | 4 +-
.../apache/pinot/tools/UpsertJsonQuickStart.java | 117 +++++++++++++++++++++
.../tools/admin/command/QuickStartCommand.java | 7 +-
.../pinot/tools/streams/MeetupRsvpJsonStream.java | 19 +++-
...sert_json_meetupRsvp_realtime_table_config.json | 101 ++++++++++++++++++
.../meetupRsvp/upsert_json_meetupRsvp_schema.json | 50 +++++++++
6 files changed, 294 insertions(+), 4 deletions(-)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 703d856..05877fa 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -678,12 +678,12 @@ public final class TableConfigUtils {
Preconditions.checkArgument(fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY,
"FST Index is only enabled on dictionary encoded columns");
Preconditions.checkState(fieldConfigColSpec.isSingleValueField()
- && fieldConfigColSpec.getDataType() == DataType.STRING,
+ && fieldConfigColSpec.getDataType().getStoredType() == DataType.STRING,
"FST Index is only supported for single value string columns");
break;
case TEXT:
Preconditions.checkState(fieldConfigColSpec.isSingleValueField()
- && fieldConfigColSpec.getDataType() == DataType.STRING,
+ && fieldConfigColSpec.getDataType().getStoredType() == DataType.STRING,
"TEXT Index is only supported for single value string columns");
break;
default:
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
new file mode 100644
index 0000000..b654606
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.apache.pinot.tools.Quickstart.Color;
+import org.apache.pinot.tools.admin.PinotAdministrator;
+import org.apache.pinot.tools.admin.command.QuickstartRunner;
+import org.apache.pinot.tools.streams.MeetupRsvpJsonStream;
+import org.apache.pinot.tools.utils.KafkaStarterUtils;
+
+import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
+import static org.apache.pinot.tools.Quickstart.printStatus;
+
+
+public class UpsertJsonQuickStart extends QuickStartBase {
+ private StreamDataServerStartable _kafkaStarter;
+
+ public static void main(String[] args)
+ throws Exception {
+ List<String> arguments = new ArrayList<>();
+ arguments.addAll(Arrays.asList("QuickStart", "-type", "UPSERT-JSON-INDEX"));
+ arguments.addAll(Arrays.asList(args));
+ PinotAdministrator.main(arguments.toArray(new String[0]));
+ }
+
+ public void execute()
+ throws Exception {
+ File quickstartTmpDir = new File(_tmpDir, String.valueOf(System.currentTimeMillis()));
+ File baseDir = new File(quickstartTmpDir, "meetupRsvp");
+ File dataDir = new File(baseDir, "data");
+ Preconditions.checkState(dataDir.mkdirs());
+
+ File schemaFile = new File(baseDir, "meetupRsvp_schema.json");
+ File tableConfigFile = new File(baseDir, "meetupRsvp_realtime_table_config.json");
+
+ ClassLoader classLoader = Quickstart.class.getClassLoader();
+ URL resource = classLoader.getResource("examples/stream/meetupRsvp/upsert_json_meetupRsvp_schema.json");
+ Preconditions.checkNotNull(resource);
+ FileUtils.copyURLToFile(resource, schemaFile);
+ resource = classLoader.getResource("examples/stream/meetupRsvp/upsert_json_meetupRsvp_realtime_table_config.json");
+ Preconditions.checkNotNull(resource);
+ FileUtils.copyURLToFile(resource, tableConfigFile);
+
+ QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath());
+ QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir);
+
+ printStatus(Color.CYAN, "***** Starting Kafka *****");
+ ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer();
+ try {
+ _kafkaStarter = StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
+ KafkaStarterUtils.getDefaultKafkaConfiguration(zookeeperInstance));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to start " + KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
+ }
+ _kafkaStarter.start();
+ _kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2));
+ printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****");
+ MeetupRsvpJsonStream meetupRSVPProvider = new MeetupRsvpJsonStream(true);
+ meetupRSVPProvider.run();
+ printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****");
+ runner.startAll();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ printStatus(Color.GREEN, "***** Shutting down realtime quick start *****");
+ runner.stop();
+ meetupRSVPProvider.stopPublishing();
+ _kafkaStarter.stop();
+ ZkStarter.stopLocalZkServer(zookeeperInstance);
+ FileUtils.deleteDirectory(quickstartTmpDir);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }));
+ printStatus(Color.CYAN, "***** Bootstrap meetupRSVP table *****");
+ runner.bootstrapTable();
+ printStatus(Color.CYAN, "***** Waiting for 20 seconds for a few events to get populated *****");
+ Thread.sleep(20000);
+
+ printStatus(Color.YELLOW, "***** Upsert json-index quickstart setup complete *****");
+
+ String q1 = "select json_extract_scalar(event_json, '$.event_name', 'STRING') from meetupRsvp where json_match"
+ + "(group_json, '\"$.group_topics[*].topic_name\"=''Fitness''') limit 10";
+ printStatus(Color.YELLOW, "Events related to fitness");
+ printStatus(Color.CYAN, "Query : " + q1);
+ printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1)));
+
+ printStatus(Color.GREEN, "***************************************************");
+ printStatus(Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console");
+ }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickStartCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickStartCommand.java
index 7e116c8..92fd15d 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickStartCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickStartCommand.java
@@ -31,6 +31,7 @@ import org.apache.pinot.tools.RealtimeComplexTypeHandlingQuickStart;
import org.apache.pinot.tools.RealtimeJsonIndexQuickStart;
import org.apache.pinot.tools.RealtimeQuickStart;
import org.apache.pinot.tools.RealtimeQuickStartWithMinion;
+import org.apache.pinot.tools.UpsertJsonQuickStart;
import org.apache.pinot.tools.UpsertQuickStart;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
@@ -105,8 +106,8 @@ public class QuickStartCommand extends AbstractBaseAdminCommand implements Comma
case "BATCH-MINION":
quickstart = new BatchQuickstartWithMinion();
break;
- case "REALTIME-MINION":
case "REALTIME_MINION":
+ case "REALTIME-MINION":
quickstart = new RealtimeQuickStartWithMinion();
break;
case "REALTIME":
@@ -134,6 +135,10 @@ public class QuickStartCommand extends AbstractBaseAdminCommand implements Comma
case "STREAM-JSON-INDEX":
quickstart = new RealtimeJsonIndexQuickStart();
break;
+ case "UPSERT_JSON_INDEX":
+ case "UPSERT-JSON-INDEX":
+ quickstart = new UpsertJsonQuickStart();
+ break;
case "OFFLINE_COMPLEX_TYPE":
case "OFFLINE-COMPLEX-TYPE":
case "BATCH_COMPLEX_TYPE":
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java
index 9a3a0ae..624d277 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java
@@ -18,7 +18,9 @@
*/
package org.apache.pinot.tools.streams;
+import com.fasterxml.jackson.databind.JsonNode;
import javax.websocket.MessageHandler;
+import org.apache.pinot.spi.utils.JsonUtils;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -30,11 +32,26 @@ public class MeetupRsvpJsonStream extends MeetupRsvpStream {
super();
}
+ public MeetupRsvpJsonStream(boolean partitionByKey)
+ throws Exception {
+ super(partitionByKey);
+ }
+
@Override
protected MessageHandler.Whole<String> getMessageHandler() {
return message -> {
if (_keepPublishing) {
- _producer.produce("meetupRSVPEvents", message.getBytes(UTF_8));
+ if (_partitionByKey) {
+ try {
+ JsonNode messageJson = JsonUtils.stringToJsonNode(message);
+ String rsvpId = messageJson.get("rsvp_id").asText();
+ _producer.produce("meetupRSVPEvents", rsvpId.getBytes(UTF_8), message.getBytes(UTF_8));
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while processing the message: {}", message, e);
+ }
+ } else {
+ _producer.produce("meetupRSVPEvents", message.getBytes(UTF_8));
+ }
}
};
}
diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_realtime_table_config.json
new file mode 100644
index 0000000..149587e
--- /dev/null
+++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_realtime_table_config.json
@@ -0,0 +1,101 @@
+{
+ "tableName": "meetupRsvp",
+ "tableType": "REALTIME",
+ "tenants": {},
+ "segmentsConfig": {
+ "timeColumnName": "mtime",
+ "timeType": "MILLISECONDS",
+ "segmentPushType": "APPEND",
+ "replicasPerPartition": "1",
+ "retentionTimeUnit": "DAYS",
+ "retentionTimeValue": "1"
+ },
+ "ingestionConfig": {
+ "streamIngestionConfig": {
+ "streamConfigMaps": [
+ {
+ "streamType": "kafka",
+ "stream.kafka.consumer.type": "lowLevel",
+ "stream.kafka.topic.name": "meetupRSVPEvents",
+ "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
+ "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
+ "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
+ "stream.kafka.zk.broker.url": "localhost:2191/kafka",
+ "stream.kafka.broker.list": "localhost:19092"
+ }
+ ]
+ },
+ "transformConfigs": [
+ {
+ "columnName": "event_json",
+ "transformFunction": "jsonFormat(event)"
+ },
+ {
+ "columnName": "group_json",
+ "transformFunction": "jsonFormat(\"group\")"
+ },
+ {
+ "columnName": "member_json",
+ "transformFunction": "jsonFormat(member)"
+ },
+ {
+ "columnName": "venue_json",
+ "transformFunction": "jsonFormat(venue)"
+ }
+ ]
+ },
+ "tableIndexConfig": {
+ "loadMode": "MMAP",
+ "noDictionaryColumns": [
+ "event_json",
+ "group_json",
+ "member_json",
+ "venue_json"
+ ],
+ "jsonIndexColumns": [
+ "event_json",
+ "group_json",
+ "member_json",
+ "venue_json"
+ ]
+ },
+ "fieldConfigList": [
+ {
+ "name": "event_json",
+ "encodingType": "RAW",
+ "indexTypes": [
+ "TEXT"
+ ]
+ },
+ {
+ "name": "group_json",
+ "encodingType": "RAW",
+ "indexTypes": [
+ "TEXT"
+ ]
+ },
+ {
+ "name": "member_json",
+ "encodingType": "RAW",
+ "indexTypes": [
+ "TEXT"
+ ]
+ },
+ {
+ "name": "venue_json",
+ "encodingType": "RAW",
+ "indexTypes": [
+ "TEXT"
+ ]
+ }
+ ],
+ "routing": {
+ "instanceSelectorType": "strictReplicaGroup"
+ },
+ "upsertConfig": {
+ "mode": "FULL"
+ },
+ "metadata": {
+ "customConfigs": {}
+ }
+}
diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_schema.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_schema.json
new file mode 100644
index 0000000..9112c73
--- /dev/null
+++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_schema.json
@@ -0,0 +1,50 @@
+{
+ "schemaName": "meetupRsvp",
+ "dimensionFieldSpecs": [
+ {
+ "name": "event_json",
+ "dataType": "JSON"
+ },
+ {
+ "name": "group_json",
+ "dataType": "JSON"
+ },
+ {
+ "name": "member_json",
+ "dataType": "JSON"
+ },
+ {
+ "name": "venue_json",
+ "dataType": "JSON"
+ },
+ {
+ "name": "response",
+ "dataType": "STRING"
+ },
+ {
+ "name": "rsvp_id",
+ "dataType": "LONG"
+ },
+ {
+ "name": "visibility",
+ "dataType": "STRING"
+ }
+ ],
+ "metricFieldSpecs": [
+ {
+ "name": "guests",
+ "dataType": "INT"
+ }
+ ],
+ "dateTimeFieldSpecs": [
+ {
+ "name": "mtime",
+ "dataType": "TIMESTAMP",
+ "format": "1:MILLISECONDS:TIMESTAMP",
+ "granularity": "1:MILLISECONDS"
+ }
+ ],
+ "primaryKeyColumns": [
+ "rsvp_id"
+ ]
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org