You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2019/11/20 08:01:16 UTC
[incubator-pinot] branch master updated: Adding example of
pinot-quickstart with Avro encoded messages in Kafka (#4836)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2189c92 Adding example of pinot-quickstart with Avro encoded messages in Kafka (#4836)
2189c92 is described below
commit 2189c9273572401a85d5e9c69b3bc063328a8ec5
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed Nov 20 00:01:08 2019 -0800
Adding example of pinot-quickstart with Avro encoded messages in Kafka (#4836)
* Make StreamAvroIntoKafkaCommand also supported dump data in avro format into Kafka
* Update k8s quickstart to include avro ingestion
---
kubernetes/helm/README.md | 35 ++++++++---
kubernetes/helm/pinot-realtime-quickstart.yml | 69 ++++++++++++++++++++--
.../tools/admin/command/AddSchemaCommand.java | 4 +-
.../pinot/tools/admin/command/AddTableCommand.java | 5 +-
.../admin/command/StreamAvroIntoKafkaCommand.java | 28 +++++++--
5 files changed, 121 insertions(+), 20 deletions(-)
diff --git a/kubernetes/helm/README.md b/kubernetes/helm/README.md
index a9b37d5..6227184 100644
--- a/kubernetes/helm/README.md
+++ b/kubernetes/helm/README.md
@@ -185,6 +185,15 @@ helm dependency update
### Start Pinot with Helm
+- For helm v3.0.0
+
+```bash
+kubectl create ns pinot-quickstart
+helm install -n pinot-quickstart pinot .
+```
+
+- For helm v2.12.1
+
If cluster is just initialized, ensure helm is initialized by running:
```bash
@@ -197,7 +206,7 @@ Then deploy pinot cluster by:
helm install --namespace "pinot-quickstart" --name "pinot" .
```
-#### Troubleshooting
+#### Troubleshooting (For helm v2.12.1)
- Error: Please run below command if encountering issue:
```
@@ -232,6 +241,15 @@ kubectl get all -n pinot-quickstart
#### Bring up a Kafka Cluster for realtime data ingestion
+- For helm v3.0.0
+
+```bash
+helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
+helm install -n pinot-quickstart kafka incubator/kafka
+```
+
+- For helm v2.12.1
+
```bash
helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
helm install --namespace "pinot-quickstart" --name kafka incubator/kafka
@@ -241,6 +259,7 @@ helm install --namespace "pinot-quickstart" --name kafka incubator/kafka
```bash
kubectl -n pinot-quickstart exec kafka-0 -- kafka-topics --zookeeper kafka-zookeeper:2181 --topic flights-realtime --create --partitions 1 --replication-factor 1
+kubectl -n pinot-quickstart exec kafka-0 -- kafka-topics --zookeeper kafka-zookeeper:2181 --topic flights-realtime-avro --create --partitions 1 --replication-factor 1
```
#### Load data into Kafka and create Pinot schema/table
@@ -257,14 +276,6 @@ Please use below script to do local port-forwarding and open Pinot query console
./query-pinot-data.sh
```
-### How to clean up Pinot deployment
-
-```bash
-kubectl delete -f pinot-realtime-quickstart.yml
-helm del --purge kafka
-helm del --purge pinot
-```
-
## Configuring the Chart
This chart includes a ZooKeeper chart as a dependency to the Pinot
@@ -485,3 +496,9 @@ Query 20191112_051114_00006_xkm4g, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:00 [1 rows, 8B] [2 rows/s, 19B/s]
```
+
+## How to clean up Pinot deployment
+
+```bash
+kubectl delete ns pinot-quickstart
+```
\ No newline at end of file
diff --git a/kubernetes/helm/pinot-realtime-quickstart.yml b/kubernetes/helm/pinot-realtime-quickstart.yml
index a121b11..3885d2c 100644
--- a/kubernetes/helm/pinot-realtime-quickstart.yml
+++ b/kubernetes/helm/pinot-realtime-quickstart.yml
@@ -63,6 +63,47 @@ data:
}
}
+ airlineStatsAvro_realtime_table_config.json: |-
+ {
+ "tableName": "airlineStatsAvro",
+ "tableType": "REALTIME",
+ "segmentsConfig": {
+ "timeColumnName": "DaysSinceEpoch",
+ "timeType": "DAYS",
+ "retentionTimeUnit": "DAYS",
+ "retentionTimeValue": "3650",
+ "segmentPushType": "APPEND",
+ "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
+ "schemaName": "airlineStats",
+ "replication": "1",
+ "replicasPerPartition": "1"
+ },
+ "tenants": {
+ "broker": "airline_broker",
+ "server": "airline"
+ },
+ "tableIndexConfig": {
+ "loadMode": "MMAP",
+ "streamConfigs": {
+ "streamType": "kafka",
+ "stream.kafka.consumer.type": "simple",
+ "stream.kafka.topic.name": "flights-realtime-avro",
+ "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.stream.SimpleAvroMessageDecoder",
+ "stream.kafka.decoder.prop.schema": "{\"type\":\"record\",\"name\":\"Flight\",\"namespace\":\"pinot\",\"fields\":[{\"name\":\"DaysSinceEpoch\",\"type\":[\"int\"]},{\"name\":\"Year\",\"type\":[\"int\"]},{\"name\":\"Quarter\",\"type\":[\"int\"]},{\"name\":\"Month\",\"type\":[\"int\"]},{\"name\":\"DayofMonth\",\"type\":[\"int\"]},{\"name\":\"DayOfWeek\",\"type\":[\"int\"]},{\"name\":\"FlightDate\",\"type\":[\"string\"]},{\"name\":\"UniqueCarrier\",\"type\":[\"string\"]},{\"name\": [...]
+ "stream.kafka.consumer.factory.class.name": "org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory",
+ "stream.kafka.hlc.zk.connect.string": "kafka-zookeeper:2181",
+ "stream.kafka.zk.broker.url": "kafka-zookeeper:2181",
+ "stream.kafka.broker.list": "kafka:9092",
+ "realtime.segment.flush.threshold.time": "3600000",
+ "realtime.segment.flush.threshold.size": "50000",
+ "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
+ }
+ },
+ "metadata": {
+ "customConfigs": {}
+ }
+ }
+
airlineStats_schema.json: |-
{
"metricFieldSpecs": [
@@ -421,8 +462,11 @@ spec:
spec:
containers:
- name: pinot-add-example-schema
- image: winedepot/pinot:0.1.13-SNAPSHOT
+ image: fx19880617/pinot:0.2.0-SNAPSHOT
args: [ "AddSchema", "-schemaFile", "/var/pinot/examples/airlineStats_schema.json", "-controllerHost", "pinot-controller", "-controllerPort", "9000", "-exec" ]
+ env:
+ - name: JAVA_OPTS
+ value: "-Xms4G -Xmx4G -Dpinot.admin.system.exit=true"
volumeMounts:
- name: examples
mountPath: /var/pinot/examples
@@ -442,9 +486,21 @@ spec:
template:
spec:
containers:
- - name: pinot-add-example-realtime-table
- image: winedepot/pinot:0.1.13-SNAPSHOT
+ - name: pinot-add-example-realtime-table-json
+ image: fx19880617/pinot:0.2.0-SNAPSHOT
args: [ "AddTable", "-filePath", "/var/pinot/examples/airlineStats_realtime_table_config.json", "-controllerHost", "pinot-controller", "-controllerPort", "9000", "-exec" ]
+ env:
+ - name: JAVA_OPTS
+ value: "-Xms4G -Xmx4G -Dpinot.admin.system.exit=true"
+ volumeMounts:
+ - name: examples
+ mountPath: /var/pinot/examples
+ - name: pinot-add-example-realtime-table-avro
+ image: fx19880617/pinot:0.2.0-SNAPSHOT
+ args: [ "AddTable", "-filePath", "/var/pinot/examples/airlineStatsAvro_realtime_table_config.json", "-controllerHost", "pinot-controller", "-controllerPort", "9000", "-exec" ]
+ env:
+ - name: JAVA_OPTS
+ value: "-Xms4G -Xmx4G -Dpinot.admin.system.exit=true"
volumeMounts:
- name: examples
mountPath: /var/pinot/examples
@@ -464,9 +520,12 @@ spec:
template:
spec:
containers:
- - name: loading-data-to-kafka
- image: winedepot/pinot:0.1.13-SNAPSHOT
+ - name: loading-json-data-to-kafka
+ image: fx19880617/pinot:0.2.0-SNAPSHOT
args: [ "StreamAvroIntoKafka", "-avroFile", "sample_data/airlineStats_data.avro", "-kafkaTopic", "flights-realtime", "-kafkaBrokerList", "kafka:9092", "-zkAddress", "kafka-zookeeper:2181" ]
+ - name: loading-avro-data-to-kafka
+ image: fx19880617/pinot:0.2.0-SNAPSHOT
+ args: [ "StreamAvroIntoKafka", "-avroFile", "sample_data/airlineStats_data.avro", "-kafkaTopic", "flights-realtime-avro", "-kafkaBrokerList", "kafka:9092", "-zkAddress", "kafka-zookeeper:2181", "-outputFormat", "avro" ]
restartPolicy: OnFailure
backoffLimit: 3
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
index f0df976..3eb9215 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java
@@ -120,8 +120,10 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman
fileUploadDownloadClient.addSchema(
FileUploadDownloadClient.getUploadSchemaHttpURI(_controllerHost, Integer.parseInt(_controllerPort)),
schema.getSchemaName(), schemaFile);
+ } catch (Exception e) {
+ LOGGER.error("Got Exception to upload Pinot Schema: " + schema.getSchemaName(), e);
+ return false;
}
-
return true;
}
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
index bc57050..4c3b6ce 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java
@@ -120,7 +120,10 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command
.sendPostRequest(ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTableCreate(), node.toString());
LOGGER.info(res);
- return true;
+ if (res.contains("succesfully added")) {
+ return true;
+ }
+ return false;
}
@Override
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
index 66ae944..495f2e7 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
@@ -20,12 +20,17 @@ package org.apache.pinot.tools.admin.command;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
import org.apache.pinot.core.util.AvroUtils;
@@ -57,9 +62,13 @@ public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand impleme
@Option(name = "-zkAddress", required = false, metaVar = "<string>", usage = "Address of Zookeeper.")
private String _zkAddress = "localhost:2181";
+ @Option(name = "-outputFormat", required = false, metaVar = "<string>", usage = "Data format to produce to Kafka, supported: json(default) and avro")
+ private String _outputFormat = "json";
+
@Option(name = "-millisBetweenMessages", required = false, metaVar = "<int>", usage = "Delay in milliseconds between messages (default 1000 ms)")
private String _millisBetweenMessages = "1000";
+
@Override
public boolean getHelp() {
return _help;
@@ -73,7 +82,7 @@ public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand impleme
@Override
public String toString() {
return "StreamAvroInfoKafka -avroFile " + _avroFile + " -kafkaBrokerList " + _kafkaBrokerList + " -kafkaTopic "
- + _kafkaTopic + " -millisBetweenMessages " + _millisBetweenMessages;
+ + _kafkaTopic + "-outputFormat" + _outputFormat + " -millisBetweenMessages " + _millisBetweenMessages;
}
@Override
@@ -111,12 +120,23 @@ public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand impleme
try {
// Open the Avro file
DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(new File(_avroFile));
-
+ DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(reader.getSchema());
// Iterate over every record
for (GenericRecord genericRecord : reader) {
+ byte[] bytes;
+ switch (_outputFormat) {
+ case "avro":
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
+ datumWriter.write(genericRecord, encoder);
+ encoder.flush();
+ bytes = outputStream.toByteArray();
+ break;
+ default:
+ String recordJson = genericRecord.toString();
+ bytes = recordJson.getBytes("utf-8");
+ }
// Write the message to Kafka
- String recordJson = genericRecord.toString();
- byte[] bytes = recordJson.getBytes("utf-8");
streamDataProducer.produce(_kafkaTopic, Longs.toByteArray(HashUtil.hash64(bytes, bytes.length)), bytes);
// Sleep between messages
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org