You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "vtlim (via GitHub)" <gi...@apache.org> on 2023/05/01 18:58:22 UTC

[GitHub] [druid] vtlim commented on a diff in pull request #13984: Docs: Tutorial for streaming ingestion using Kafka + Docker file to use with Jupyter tutorials

vtlim commented on code in PR #13984:
URL: https://github.com/apache/druid/pull/13984#discussion_r1181804021


##########
examples/quickstart/jupyter-notebooks/kafka-tutorial.ipynb:
##########
@@ -0,0 +1,753 @@
+{
+ "cells": [
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "# Tutorial: Ingest and query data from Apache Kafka\n",
+    "\n",
+    "<!--\n",
+    "  ~ Licensed to the Apache Software Foundation (ASF) under one\n",
+    "  ~ or more contributor license agreements.  See the NOTICE file\n",
+    "  ~ distributed with this work for additional information\n",
+    "  ~ regarding copyright ownership.  The ASF licenses this file\n",
+    "  ~ to you under the Apache License, Version 2.0 (the\n",
+    "  ~ \"License\"); you may not use this file except in compliance\n",
+    "  ~ with the License.  You may obtain a copy of the License at\n",
+    "  ~\n",
+    "  ~   http://www.apache.org/licenses/LICENSE-2.0\n",
+    "  ~\n",
+    "  ~ Unless required by applicable law or agreed to in writing,\n",
+    "  ~ software distributed under the License is distributed on an\n",
+    "  ~ \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+    "  ~ KIND, either express or implied.  See the License for the\n",
+    "  ~ specific language governing permissions and limitations\n",
+    "  ~ under the License.\n",
+    "  -->\n",
+    "\n",
+    "This tutorial introduces you to streaming ingestion in Apache Druid using the Apache Kafka event streaming platform.\n",
+    "Follow along to learn how to create and load data into a Kafka topic, start ingesting data from the topic into Druid, and query results over time. This tutorial assumes you have a basic understanding of Druid ingestion, querying, and API requests."
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "## Table of contents\n",
+    "\n",
+    "* [Prerequisites](#Prerequisites)\n",
+    "* [Load Druid API client](#Load-Druid-API-client)\n",
+    "* [Create Kafka topic](#Create-Kafka-topic)\n",
+    "* [Load data into Kafka topic](#Load-data-into-Kafka-topic)\n",
+    "* [Start Druid ingestion](#Start-Druid-ingestion)\n",
+    "* [Query Druid datasource and visualize query results](#Query-Druid-datasource-and-visualize-query-results)\n",
+    "* [Learn more](#Learn-more)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "## Prerequisites\n",
+    "\n",
+    "Launch this tutorial and all prerequisites using the `all-services` profile of the Docker Compose file for Jupyter-based Druid tutorials. For more information, see [Docker for Jupyter Notebook tutorials](https://druid.apache.org/docs/latest/tutorials/tutorial-jupyter-docker.html).\n",
+    "\n",
+    "Otherwise, you need the following:\n",
+    "* A running Druid instance.\n",
+    "   * Update the `druid_host` variable to point to your Router endpoint. For example, `druid_host = \"http://localhost:8888\"`.\n",
+    "* A running Kafka cluster.\n",
+    "   * Update the Kafka bootstrap servers to point to your servers. For example, `bootstrap_servers=[\"localhost:9092\"]`.\n",
+    "* The following Python packages:\n",
+    "   * `druidapi`, a Python client for Apache Druid\n",
+    "   * `DruidDataDriver`, a data generator\n",
+    "   * `kafka`, a Python client for Apache Kafka\n",
+    "   * `pandas`, `matplotlib`, and `seaborn` for data visualization\n"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "## Load Druid API client"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "To start the tutorial, run the following cell. It imports the required Python packages and defines a variable for the Druid client, and another for the SQL client used to run SQL commands."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 1,
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/html": [
+       "\n",
+       "<style>\n",
+       "  .druid table {\n",
+       "    border: 1px solid black;\n",
+       "    border-collapse: collapse;\n",
+       "  }\n",
+       "\n",
+       "  .druid th, .druid td {\n",
+       "    padding: 4px 1em ;\n",
+       "    text-align: left;\n",
+       "  }\n",
+       "\n",
+       "  td.druid-right, th.druid-right {\n",
+       "    text-align: right;\n",
+       "  }\n",
+       "\n",
+       "  td.druid-center, th.druid-center {\n",
+       "    text-align: center;\n",
+       "  }\n",
+       "\n",
+       "  .druid .druid-left {\n",
+       "    text-align: left;\n",
+       "  }\n",
+       "\n",
+       "  .druid-alert {\n",
+       "    font-weight: bold;\n",
+       "  }\n",
+       "\n",
+       "  .druid-error {\n",
+       "    color: red;\n",
+       "  }\n",
+       "</style>\n"
+      ],
+      "text/plain": [
+       "<IPython.core.display.HTML object>"
+      ]
+     },
+     "metadata": {},
+     "output_type": "display_data"
+    }
+   ],
+   "source": [
+    "import druidapi\n",
+    "import json\n",
+    "\n",
+    "# druid_host is the hostname and port for your Druid deployment. \n",
+    "# In a distributed environment, you can point to other Druid services.\n",
+    "# In this tutorial, you'll use the Router service as the `druid_host`.\n",
+    "druid_host = \"http://router:8888\"\n",
+    "\n",
+    "druid = druidapi.jupyter_client(druid_host)\n",
+    "display = druid.display\n",
+    "sql_client = druid.sql\n",
+    "\n",
+    "# Create a rest client for native JSON ingestion for streaming data\n",
+    "rest_client = druidapi.rest.DruidRestClient(\"http://coordinator:8081\")"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "## Create Kafka topic"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "This notebook relies on the Python client for the Apache Kafka. Import the Kafka producer and consumer modules, then create a Kafka client. You use the Kafka producer to create and publish records to a new topic named `social_media`."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 2,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "from kafka import KafkaProducer\n",
+    "from kafka import KafkaConsumer\n",
+    "\n",
+    "# Kafka runs on kafka:9092 in multi-container tutorial application\n",
+    "producer = KafkaProducer(bootstrap_servers='kafka:9092')\n",
+    "topic_name = \"social_media\""
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Create the `social_media` topic and send a sample event. The `send()` command returns a metadata descriptor for the record."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 3,
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/plain": [
+       "<kafka.producer.future.FutureRecordMetadata at 0x7f0d904e4940>"
+      ]
+     },
+     "execution_count": 3,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "event = {\n",
+    "    \"__time\": \"2023-01-03T16:40:21.501\",\n",
+    "    \"username\": \"willow_bean\",\n",
+    "    \"post_title\": \"This title is required\",\n",
+    "    \"views\": 15284,\n",
+    "    \"upvotes\": 124,\n",
+    "    \"comments\": 21,\n",
+    "    \"edited\": \"True\"\n",
+    "}\n",
+    "\n",
+    "producer.send(topic_name, json.dumps(event).encode('utf-8'))"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "To verify that the Kafka topic stored the event, create a consumer client to read records from the Kafka cluster, and get the next (only) message:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 4,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "{\"__time\": \"2023-01-03T16:40:21.501\", \"username\": \"willow_bean\", \"post_title\": \"This title is required\", \"views\": 15284, \"upvotes\": 124, \"comments\": 21, \"edited\": \"True\"}\n"
+     ]
+    }
+   ],
+   "source": [
+    "consumer = KafkaConsumer(topic_name, bootstrap_servers=['kafka:9092'], auto_offset_reset='earliest',\n",
+    "     enable_auto_commit=True)\n",
+    "\n",
+    "print(next(consumer).value.decode('utf-8'))"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "## Load data into Kafka topic"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Instead of manually creating events to send to the Kafka topic, use a data generator to simulate a continuous data stream. This tutorial makes use of Druid Data Driver to simulate a continuous data stream into the `social_media` Kafka topic. To learn more about the Druid Data Driver, see the Druid Summit talk, [Generating Time centric Data for Apache Druid](https://www.youtube.com/watch?v=3zAOeLe3iAo).\n",
+    "\n",
+    "In this notebook, you use a background process to continuously load data into the Kafka topic.\n",
+    "This allows you to keep executing commands in this notebook while data is constantly being streamed into the topic."
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Run the following cells to load sample data into the `social_media` Kafka topic:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 5,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import multiprocessing as mp\n",
+    "from datetime import datetime\n",
+    "import DruidDataDriver"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 6,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "def run_driver():\n",
+    "    DruidDataDriver.simulate(\"kafka_docker_config.json\", None, None, \"REAL\", datetime.now())\n",
+    "        \n",
+    "mp.set_start_method('fork')\n",
+    "ps = mp.Process(target=run_driver)\n",
+    "ps.start()"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "## Start Druid ingestion"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Now that you have a new Kafka topic and data being streamed into the topic, you ingest the data into Druid by submitting a Kafka ingestion spec.\n",
+    "For more information about Kafka ingestion in Druid, see [Apache Kafka ingestion](https://druid.apache.org/docs/latest/development/extensions-core/kafka-ingestion.html).\n",
+    "\n",
+    "Run the following cells to define and submit the Kafka ingestion spec."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 7,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "kafka_ingestion_spec = \"{\\\"type\\\": \\\"kafka\\\",\\\"spec\\\": {\\\"ioConfig\\\": {\\\"type\\\": \\\"kafka\\\",\\\"consumerProperties\\\": {\\\"bootstrap.servers\\\": \\\"kafka:9092\\\"},\\\"topic\\\": \\\"social_media\\\",\\\"inputFormat\\\": {\\\"type\\\": \\\"json\\\"},\\\"useEarliestOffset\\\": true},\\\"tuningConfig\\\": {\\\"type\\\": \\\"kafka\\\"},\\\"dataSchema\\\": {\\\"dataSource\\\": \\\"social_media\\\",\\\"timestampSpec\\\": {\\\"column\\\": \\\"__time\\\",\\\"format\\\": \\\"iso\\\"},\\\"dimensionsSpec\\\": {\\\"dimensions\\\": [\\\"username\\\",\\\"post_title\\\",{\\\"type\\\": \\\"long\\\",\\\"name\\\": \\\"views\\\"},{\\\"type\\\": \\\"long\\\",\\\"name\\\": \\\"upvotes\\\"},{\\\"type\\\": \\\"long\\\",\\\"name\\\": \\\"comments\\\"},\\\"edited\\\"]},\\\"granularitySpec\\\": {\\\"queryGranularity\\\": \\\"none\\\",\\\"rollup\\\": false,\\\"segmentGranularity\\\": \\\"hour\\\"}}}}\""

Review Comment:
   I think for the tutorial it's easier to understand the example data (what it's made of and what's being sent to Druid) in JSON format since we're not doing anything with the Kafka metadata fields. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org