You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2019/11/20 08:01:16 UTC

[incubator-pinot] branch master updated: Adding example of pinot-quickstart with Avro encoded messages in Kafka (#4836)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2189c92  Adding example of pinot-quickstart with Avro encoded messages in Kafka (#4836)
2189c92 is described below

commit 2189c9273572401a85d5e9c69b3bc063328a8ec5
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed Nov 20 00:01:08 2019 -0800

    Adding example of pinot-quickstart with Avro encoded messages in Kafka (#4836)
    
    * Make StreamAvroIntoKafkaCommand also supported dump data in avro format into Kafka
    
    * Update k8s quickstart to include avro ingestion
---
 kubernetes/helm/README.md                          | 35 ++++++++---
 kubernetes/helm/pinot-realtime-quickstart.yml      | 69 ++++++++++++++++++++--
 .../tools/admin/command/AddSchemaCommand.java      |  4 +-
 .../pinot/tools/admin/command/AddTableCommand.java |  5 +-
 .../admin/command/StreamAvroIntoKafkaCommand.java  | 28 +++++++--
 5 files changed, 121 insertions(+), 20 deletions(-)

diff --git a/kubernetes/helm/README.md b/kubernetes/helm/README.md
index a9b37d5..6227184 100644
--- a/kubernetes/helm/README.md
+++ b/kubernetes/helm/README.md
@@ -185,6 +185,15 @@ helm dependency update
 
 ### Start Pinot with Helm
 
+- For helm v3.0.0
+
+```bash
+kubectl create ns pinot-quickstart
+helm install -n pinot-quickstart pinot .
+```
+
+- For helm v2.12.1
+
 If cluster is just initialized, ensure helm is initialized by running:
 
 ```bash
@@ -197,7 +206,7 @@ Then deploy pinot cluster by:
 helm install --namespace "pinot-quickstart" --name "pinot" .
 ```
 
-#### Troubleshooting
+#### Troubleshooting (For helm v2.12.1)
 - Error: Please run below command if encountering issue:
 
 ```
@@ -232,6 +241,15 @@ kubectl get all -n pinot-quickstart
 
 #### Bring up a Kafka Cluster for realtime data ingestion
 
+- For helm v3.0.0
+
+```bash
+helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
+helm install -n pinot-quickstart kafka incubator/kafka
+```
+
+- For helm v2.12.1
+
 ```bash
 helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
 helm install --namespace "pinot-quickstart"  --name kafka incubator/kafka
@@ -241,6 +259,7 @@ helm install --namespace "pinot-quickstart"  --name kafka incubator/kafka
 
 ```bash
 kubectl -n pinot-quickstart exec kafka-0 -- kafka-topics --zookeeper kafka-zookeeper:2181 --topic flights-realtime --create --partitions 1 --replication-factor 1
+kubectl -n pinot-quickstart exec kafka-0 -- kafka-topics --zookeeper kafka-zookeeper:2181 --topic flights-realtime-avro --create --partitions 1 --replication-factor 1
 ```
 
 #### Load data into Kafka and create Pinot schema/table
@@ -257,14 +276,6 @@ Please use below script to do local port-forwarding and open Pinot query console
 ./query-pinot-data.sh
 ```
 
-### How to clean up Pinot deployment
-
-```bash
-kubectl delete -f pinot-realtime-quickstart.yml
-helm del --purge kafka
-helm del --purge pinot
-```
-
 ## Configuring the Chart
 
 This chart includes a ZooKeeper chart as a dependency to the Pinot
@@ -485,3 +496,9 @@ Query 20191112_051114_00006_xkm4g, FINISHED, 1 node
 Splits: 17 total, 17 done (100.00%)
 0:00 [1 rows, 8B] [2 rows/s, 19B/s]
 ```
+
+## How to clean up Pinot deployment
+
+```bash
+kubectl delete ns pinot-quickstart
+```
\ No newline at end of file
diff --git a/kubernetes/helm/pinot-realtime-quickstart.yml b/kubernetes/helm/pinot-realtime-quickstart.yml
index a121b11..3885d2c 100644
--- a/kubernetes/helm/pinot-realtime-quickstart.yml
+++ b/kubernetes/helm/pinot-realtime-quickstart.yml
@@ -63,6 +63,47 @@ data:
       }
     }
 
+  airlineStatsAvro_realtime_table_config.json: |-
+    {
+      "tableName": "airlineStatsAvro",
+      "tableType": "REALTIME",
+      "segmentsConfig": {
+        "timeColumnName": "DaysSinceEpoch",
+        "timeType": "DAYS",
+        "retentionTimeUnit": "DAYS",
+        "retentionTimeValue": "3650",
+        "segmentPushType": "APPEND",
+        "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
+        "schemaName": "airlineStats",
+        "replication": "1",
+        "replicasPerPartition": "1"
+      },
+      "tenants": {
+        "broker": "airline_broker",
+        "server": "airline"
+      },
+      "tableIndexConfig": {
+        "loadMode": "MMAP",
+        "streamConfigs": {
+          "streamType": "kafka",
+          "stream.kafka.consumer.type": "simple",
+          "stream.kafka.topic.name": "flights-realtime-avro",
+          "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.stream.SimpleAvroMessageDecoder",
+          "stream.kafka.decoder.prop.schema": "{\"type\":\"record\",\"name\":\"Flight\",\"namespace\":\"pinot\",\"fields\":[{\"name\":\"DaysSinceEpoch\",\"type\":[\"int\"]},{\"name\":\"Year\",\"type\":[\"int\"]},{\"name\":\"Quarter\",\"type\":[\"int\"]},{\"name\":\"Month\",\"type\":[\"int\"]},{\"name\":\"DayofMonth\",\"type\":[\"int\"]},{\"name\":\"DayOfWeek\",\"type\":[\"int\"]},{\"name\":\"FlightDate\",\"type\":[\"string\"]},{\"name\":\"UniqueCarrier\",\"type\":[\"string\"]},{\"name\": [...]
+          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory",
+          "stream.kafka.hlc.zk.connect.string": "kafka-zookeeper:2181",
+          "stream.kafka.zk.broker.url": "kafka-zookeeper:2181",
+          "stream.kafka.broker.list": "kafka:9092",
+          "realtime.segment.flush.threshold.time": "3600000",
+          "realtime.segment.flush.threshold.size": "50000",
+          "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
+        }
+      },
+      "metadata": {
+        "customConfigs": {}
+      }
+    }
+
   airlineStats_schema.json: |-
     {
       "metricFieldSpecs": [
@@ -421,8 +462,11 @@ spec:
     spec:
       containers:
         - name: pinot-add-example-schema
-          image: winedepot/pinot:0.1.13-SNAPSHOT
+          image: fx19880617/pinot:0.2.0-SNAPSHOT
           args: [ "AddSchema", "-schemaFile", "/var/pinot/examples/airlineStats_schema.json", "-controllerHost", "pinot-controller", "-controllerPort", "9000", "-exec" ]
+          env:
+            - name: JAVA_OPTS
+              value: "-Xms4G -Xmx4G -Dpinot.admin.system.exit=true"
           volumeMounts:
             - name: examples
               mountPath: /var/pinot/examples
@@ -442,9 +486,21 @@ spec:
   template:
     spec:
       containers:
-        - name: pinot-add-example-realtime-table
-          image: winedepot/pinot:0.1.13-SNAPSHOT
+        - name: pinot-add-example-realtime-table-json
+          image: fx19880617/pinot:0.2.0-SNAPSHOT
           args: [ "AddTable", "-filePath", "/var/pinot/examples/airlineStats_realtime_table_config.json", "-controllerHost", "pinot-controller", "-controllerPort", "9000", "-exec" ]
+          env:
+            - name: JAVA_OPTS
+              value: "-Xms4G -Xmx4G -Dpinot.admin.system.exit=true"
+          volumeMounts:
+            - name: examples
+              mountPath: /var/pinot/examples
+        - name: pinot-add-example-realtime-table-avro
+          image: fx19880617/pinot:0.2.0-SNAPSHOT
+          args: [ "AddTable", "-filePath", "/var/pinot/examples/airlineStatsAvro_realtime_table_config.json", "-controllerHost", "pinot-controller", "-controllerPort", "9000", "-exec" ]
+          env:
+            - name: JAVA_OPTS
+              value: "-Xms4G -Xmx4G -Dpinot.admin.system.exit=true"
           volumeMounts:
             - name: examples
               mountPath: /var/pinot/examples
@@ -464,9 +520,12 @@ spec:
   template:
     spec:
       containers:
-        - name: loading-data-to-kafka
-          image: winedepot/pinot:0.1.13-SNAPSHOT
+        - name: loading-json-data-to-kafka
+          image: fx19880617/pinot:0.2.0-SNAPSHOT
           args: [ "StreamAvroIntoKafka", "-avroFile", "sample_data/airlineStats_data.avro", "-kafkaTopic", "flights-realtime", "-kafkaBrokerList", "kafka:9092", "-zkAddress", "kafka-zookeeper:2181" ]
+        - name: loading-avro-data-to-kafka
+          image: fx19880617/pinot:0.2.0-SNAPSHOT
+          args: [ "StreamAvroIntoKafka", "-avroFile", "sample_data/airlineStats_data.avro", "-kafkaTopic", "flights-realtime-avro", "-kafkaBrokerList", "kafka:9092", "-zkAddress", "kafka-zookeeper:2181", "-outputFormat", "avro" ]
       restartPolicy: OnFailure
   backoffLimit: 3
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
index f0df976..3eb9215 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
@@ -120,8 +120,10 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman
       fileUploadDownloadClient.addSchema(
           FileUploadDownloadClient.getUploadSchemaHttpURI(_controllerHost, Integer.parseInt(_controllerPort)),
           schema.getSchemaName(), schemaFile);
+    } catch (Exception e) {
+      LOGGER.error("Got Exception to upload Pinot Schema: " + schema.getSchemaName(), e);
+      return false;
     }
-
     return true;
   }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
index bc57050..4c3b6ce 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
@@ -120,7 +120,10 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
         .sendPostRequest(ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTableCreate(), node.toString());
 
     LOGGER.info(res);
-    return true;
+    if (res.contains("succesfully added")) {
+      return true;
+    }
+    return false;
   }
 
   @Override
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
index 66ae944..495f2e7 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
@@ -20,12 +20,17 @@ package org.apache.pinot.tools.admin.command;
 
 import com.google.common.primitives.Longs;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
 import org.apache.pinot.core.util.AvroUtils;
@@ -57,9 +62,13 @@ public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand impleme
   @Option(name = "-zkAddress", required = false, metaVar = "<string>", usage = "Address of Zookeeper.")
   private String _zkAddress = "localhost:2181";
 
+  @Option(name = "-outputFormat", required = false, metaVar = "<string>", usage = "Data format to produce to Kafka, supported: json(default) and avro")
+  private String _outputFormat = "json";
+
   @Option(name = "-millisBetweenMessages", required = false, metaVar = "<int>", usage = "Delay in milliseconds between messages (default 1000 ms)")
   private String _millisBetweenMessages = "1000";
 
+
   @Override
   public boolean getHelp() {
     return _help;
@@ -73,7 +82,7 @@ public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand impleme
   @Override
   public String toString() {
     return "StreamAvroInfoKafka -avroFile " + _avroFile + " -kafkaBrokerList " + _kafkaBrokerList + " -kafkaTopic "
-        + _kafkaTopic + " -millisBetweenMessages " + _millisBetweenMessages;
+        + _kafkaTopic + "-outputFormat" + _outputFormat + " -millisBetweenMessages " + _millisBetweenMessages;
   }
 
   @Override
@@ -111,12 +120,23 @@ public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand impleme
     try {
       // Open the Avro file
       DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(new File(_avroFile));
-
+      DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(reader.getSchema());
       // Iterate over every record
       for (GenericRecord genericRecord : reader) {
+        byte[] bytes;
+        switch (_outputFormat) {
+          case "avro":
+            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+            BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
+            datumWriter.write(genericRecord, encoder);
+            encoder.flush();
+            bytes = outputStream.toByteArray();
+            break;
+          default:
+            String recordJson = genericRecord.toString();
+            bytes = recordJson.getBytes("utf-8");
+        }
         // Write the message to Kafka
-        String recordJson = genericRecord.toString();
-        byte[] bytes = recordJson.getBytes("utf-8");
         streamDataProducer.produce(_kafkaTopic, Longs.toByteArray(HashUtil.hash64(bytes, bytes.length)), bytes);
 
         // Sleep between messages


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org