You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/11/01 22:50:52 UTC

[pinot] branch master updated: Add a quick start with both UPSERT and JSON index (#7669)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cc2d63c  Add a quick start with both UPSERT and JSON index (#7669)
cc2d63c is described below

commit cc2d63c92b8586a988a7514f66cd24a139b35ebc
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Nov 1 15:50:36 2021 -0700

    Add a quick start with both UPSERT and JSON index (#7669)
---
 .../segment/local/utils/TableConfigUtils.java      |   4 +-
 .../apache/pinot/tools/UpsertJsonQuickStart.java   | 117 +++++++++++++++++++++
 .../tools/admin/command/QuickStartCommand.java     |   7 +-
 .../pinot/tools/streams/MeetupRsvpJsonStream.java  |  19 +++-
 ...sert_json_meetupRsvp_realtime_table_config.json | 101 ++++++++++++++++++
 .../meetupRsvp/upsert_json_meetupRsvp_schema.json  |  50 +++++++++
 6 files changed, 294 insertions(+), 4 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 703d856..05877fa 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -678,12 +678,12 @@ public final class TableConfigUtils {
               Preconditions.checkArgument(fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY,
                   "FST Index is only enabled on dictionary encoded columns");
               Preconditions.checkState(fieldConfigColSpec.isSingleValueField()
-                      && fieldConfigColSpec.getDataType() == DataType.STRING,
+                      && fieldConfigColSpec.getDataType().getStoredType() == DataType.STRING,
                   "FST Index is only supported for single value string columns");
               break;
             case TEXT:
               Preconditions.checkState(fieldConfigColSpec.isSingleValueField()
-                      && fieldConfigColSpec.getDataType() == DataType.STRING,
+                      && fieldConfigColSpec.getDataType().getStoredType() == DataType.STRING,
                   "TEXT Index is only supported for single value string columns");
               break;
             default:
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
new file mode 100644
index 0000000..b654606
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.apache.pinot.tools.Quickstart.Color;
+import org.apache.pinot.tools.admin.PinotAdministrator;
+import org.apache.pinot.tools.admin.command.QuickstartRunner;
+import org.apache.pinot.tools.streams.MeetupRsvpJsonStream;
+import org.apache.pinot.tools.utils.KafkaStarterUtils;
+
+import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
+import static org.apache.pinot.tools.Quickstart.printStatus;
+
+
+public class UpsertJsonQuickStart extends QuickStartBase {
+  private StreamDataServerStartable _kafkaStarter;
+
+  public static void main(String[] args)
+      throws Exception {
+    List<String> arguments = new ArrayList<>();
+    arguments.addAll(Arrays.asList("QuickStart", "-type", "UPSERT-JSON-INDEX"));
+    arguments.addAll(Arrays.asList(args));
+    PinotAdministrator.main(arguments.toArray(new String[0]));
+  }
+
+  public void execute()
+      throws Exception {
+    File quickstartTmpDir = new File(_tmpDir, String.valueOf(System.currentTimeMillis()));
+    File baseDir = new File(quickstartTmpDir, "meetupRsvp");
+    File dataDir = new File(baseDir, "data");
+    Preconditions.checkState(dataDir.mkdirs());
+
+    File schemaFile = new File(baseDir, "meetupRsvp_schema.json");
+    File tableConfigFile = new File(baseDir, "meetupRsvp_realtime_table_config.json");
+
+    ClassLoader classLoader = Quickstart.class.getClassLoader();
+    URL resource = classLoader.getResource("examples/stream/meetupRsvp/upsert_json_meetupRsvp_schema.json");
+    Preconditions.checkNotNull(resource);
+    FileUtils.copyURLToFile(resource, schemaFile);
+    resource = classLoader.getResource("examples/stream/meetupRsvp/upsert_json_meetupRsvp_realtime_table_config.json");
+    Preconditions.checkNotNull(resource);
+    FileUtils.copyURLToFile(resource, tableConfigFile);
+
+    QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath());
+    QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir);
+
+    printStatus(Color.CYAN, "***** Starting Kafka *****");
+    ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer();
+    try {
+      _kafkaStarter = StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
+          KafkaStarterUtils.getDefaultKafkaConfiguration(zookeeperInstance));
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start " + KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
+    }
+    _kafkaStarter.start();
+    _kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2));
+    printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****");
+    MeetupRsvpJsonStream meetupRSVPProvider = new MeetupRsvpJsonStream(true);
+    meetupRSVPProvider.run();
+    printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****");
+    runner.startAll();
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      try {
+        printStatus(Color.GREEN, "***** Shutting down realtime quick start *****");
+        runner.stop();
+        meetupRSVPProvider.stopPublishing();
+        _kafkaStarter.stop();
+        ZkStarter.stopLocalZkServer(zookeeperInstance);
+        FileUtils.deleteDirectory(quickstartTmpDir);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }));
+    printStatus(Color.CYAN, "***** Bootstrap meetupRSVP table *****");
+    runner.bootstrapTable();
+    printStatus(Color.CYAN, "***** Waiting for 20 seconds for a few events to get populated *****");
+    Thread.sleep(20000);
+
+    printStatus(Color.YELLOW, "***** Upsert json-index quickstart setup complete *****");
+
+    String q1 = "select json_extract_scalar(event_json, '$.event_name', 'STRING') from meetupRsvp where json_match"
+        + "(group_json, '\"$.group_topics[*].topic_name\"=''Fitness''') limit 10";
+    printStatus(Color.YELLOW, "Events related to fitness");
+    printStatus(Color.CYAN, "Query : " + q1);
+    printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1)));
+
+    printStatus(Color.GREEN, "***************************************************");
+    printStatus(Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console");
+  }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickStartCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickStartCommand.java
index 7e116c8..92fd15d 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickStartCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickStartCommand.java
@@ -31,6 +31,7 @@ import org.apache.pinot.tools.RealtimeComplexTypeHandlingQuickStart;
 import org.apache.pinot.tools.RealtimeJsonIndexQuickStart;
 import org.apache.pinot.tools.RealtimeQuickStart;
 import org.apache.pinot.tools.RealtimeQuickStartWithMinion;
+import org.apache.pinot.tools.UpsertJsonQuickStart;
 import org.apache.pinot.tools.UpsertQuickStart;
 import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
@@ -105,8 +106,8 @@ public class QuickStartCommand extends AbstractBaseAdminCommand implements Comma
       case "BATCH-MINION":
         quickstart = new BatchQuickstartWithMinion();
         break;
-      case "REALTIME-MINION":
       case "REALTIME_MINION":
+      case "REALTIME-MINION":
         quickstart = new RealtimeQuickStartWithMinion();
         break;
       case "REALTIME":
@@ -134,6 +135,10 @@ public class QuickStartCommand extends AbstractBaseAdminCommand implements Comma
       case "STREAM-JSON-INDEX":
         quickstart = new RealtimeJsonIndexQuickStart();
         break;
+      case "UPSERT_JSON_INDEX":
+      case "UPSERT-JSON-INDEX":
+        quickstart = new UpsertJsonQuickStart();
+        break;
       case "OFFLINE_COMPLEX_TYPE":
       case "OFFLINE-COMPLEX-TYPE":
       case "BATCH_COMPLEX_TYPE":
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java
index 9a3a0ae..624d277 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pinot.tools.streams;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import javax.websocket.MessageHandler;
+import org.apache.pinot.spi.utils.JsonUtils;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
@@ -30,11 +32,26 @@ public class MeetupRsvpJsonStream extends MeetupRsvpStream {
     super();
   }
 
+  public MeetupRsvpJsonStream(boolean partitionByKey)
+      throws Exception {
+    super(partitionByKey);
+  }
+
   @Override
   protected MessageHandler.Whole<String> getMessageHandler() {
     return message -> {
       if (_keepPublishing) {
-        _producer.produce("meetupRSVPEvents", message.getBytes(UTF_8));
+        if (_partitionByKey) {
+          try {
+            JsonNode messageJson = JsonUtils.stringToJsonNode(message);
+            String rsvpId = messageJson.get("rsvp_id").asText();
+            _producer.produce("meetupRSVPEvents", rsvpId.getBytes(UTF_8), message.getBytes(UTF_8));
+          } catch (Exception e) {
+            LOGGER.error("Caught exception while processing the message: {}", message, e);
+          }
+        } else {
+          _producer.produce("meetupRSVPEvents", message.getBytes(UTF_8));
+        }
       }
     };
   }
diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_realtime_table_config.json
new file mode 100644
index 0000000..149587e
--- /dev/null
+++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_realtime_table_config.json
@@ -0,0 +1,101 @@
+{
+  "tableName": "meetupRsvp",
+  "tableType": "REALTIME",
+  "tenants": {},
+  "segmentsConfig": {
+    "timeColumnName": "mtime",
+    "timeType": "MILLISECONDS",
+    "segmentPushType": "APPEND",
+    "replicasPerPartition": "1",
+    "retentionTimeUnit": "DAYS",
+    "retentionTimeValue": "1"
+  },
+  "ingestionConfig": {
+    "streamIngestionConfig": {
+      "streamConfigMaps": [
+        {
+          "streamType": "kafka",
+          "stream.kafka.consumer.type": "lowLevel",
+          "stream.kafka.topic.name": "meetupRSVPEvents",
+          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
+          "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
+          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
+          "stream.kafka.zk.broker.url": "localhost:2191/kafka",
+          "stream.kafka.broker.list": "localhost:19092"
+        }
+      ]
+    },
+    "transformConfigs": [
+      {
+        "columnName": "event_json",
+        "transformFunction": "jsonFormat(event)"
+      },
+      {
+        "columnName": "group_json",
+        "transformFunction": "jsonFormat(\"group\")"
+      },
+      {
+        "columnName": "member_json",
+        "transformFunction": "jsonFormat(member)"
+      },
+      {
+        "columnName": "venue_json",
+        "transformFunction": "jsonFormat(venue)"
+      }
+    ]
+  },
+  "tableIndexConfig": {
+    "loadMode": "MMAP",
+    "noDictionaryColumns": [
+      "event_json",
+      "group_json",
+      "member_json",
+      "venue_json"
+    ],
+    "jsonIndexColumns": [
+      "event_json",
+      "group_json",
+      "member_json",
+      "venue_json"
+    ]
+  },
+  "fieldConfigList": [
+    {
+      "name": "event_json",
+      "encodingType": "RAW",
+      "indexTypes": [
+        "TEXT"
+      ]
+    },
+    {
+      "name": "group_json",
+      "encodingType": "RAW",
+      "indexTypes": [
+        "TEXT"
+      ]
+    },
+    {
+      "name": "member_json",
+      "encodingType": "RAW",
+      "indexTypes": [
+        "TEXT"
+      ]
+    },
+    {
+      "name": "venue_json",
+      "encodingType": "RAW",
+      "indexTypes": [
+        "TEXT"
+      ]
+    }
+  ],
+  "routing": {
+    "instanceSelectorType": "strictReplicaGroup"
+  },
+  "upsertConfig": {
+    "mode": "FULL"
+  },
+  "metadata": {
+    "customConfigs": {}
+  }
+}
diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_schema.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_schema.json
new file mode 100644
index 0000000..9112c73
--- /dev/null
+++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_json_meetupRsvp_schema.json
@@ -0,0 +1,50 @@
+{
+  "schemaName": "meetupRsvp",
+  "dimensionFieldSpecs": [
+    {
+      "name": "event_json",
+      "dataType": "JSON"
+    },
+    {
+      "name": "group_json",
+      "dataType": "JSON"
+    },
+    {
+      "name": "member_json",
+      "dataType": "JSON"
+    },
+    {
+      "name": "venue_json",
+      "dataType": "JSON"
+    },
+    {
+      "name": "response",
+      "dataType": "STRING"
+    },
+    {
+      "name": "rsvp_id",
+      "dataType": "LONG"
+    },
+    {
+      "name": "visibility",
+      "dataType": "STRING"
+    }
+  ],
+  "metricFieldSpecs": [
+    {
+      "name": "guests",
+      "dataType": "INT"
+    }
+  ],
+  "dateTimeFieldSpecs": [
+    {
+      "name": "mtime",
+      "dataType": "TIMESTAMP",
+      "format": "1:MILLISECONDS:TIMESTAMP",
+      "granularity": "1:MILLISECONDS"
+    }
+  ],
+  "primaryKeyColumns": [
+    "rsvp_id"
+  ]
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org