You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2019/11/19 21:37:05 UTC

[incubator-pinot] 01/01: Make StreamAvroIntoKafkaCommand also supported dump data in avro format into Kafka

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch stream_avro_into_kafka_in_avro
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit b4e84d7091949081146bbd0d2c1a5ae4b7f0bc65
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Tue Nov 19 13:32:44 2019 -0800

    Make StreamAvroIntoKafkaCommand also supported dump data in avro format into Kafka
---
 .../admin/command/StreamAvroIntoKafkaCommand.java  | 28 ++++++++++++++++++----
 1 file changed, 24 insertions(+), 4 deletions(-)

diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
index 66ae944..495f2e7 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
@@ -20,12 +20,17 @@ package org.apache.pinot.tools.admin.command;
 
 import com.google.common.primitives.Longs;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
 import org.apache.pinot.core.util.AvroUtils;
@@ -57,9 +62,13 @@ public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand impleme
   @Option(name = "-zkAddress", required = false, metaVar = "<string>", usage = "Address of Zookeeper.")
   private String _zkAddress = "localhost:2181";
 
+  @Option(name = "-outputFormat", required = false, metaVar = "<string>", usage = "Data format to produce to Kafka, supported: json(default) and avro")
+  private String _outputFormat = "json";
+
   @Option(name = "-millisBetweenMessages", required = false, metaVar = "<int>", usage = "Delay in milliseconds between messages (default 1000 ms)")
   private String _millisBetweenMessages = "1000";
 
+
   @Override
   public boolean getHelp() {
     return _help;
@@ -73,7 +82,7 @@ public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand impleme
   @Override
   public String toString() {
     return "StreamAvroInfoKafka -avroFile " + _avroFile + " -kafkaBrokerList " + _kafkaBrokerList + " -kafkaTopic "
-        + _kafkaTopic + " -millisBetweenMessages " + _millisBetweenMessages;
+        + _kafkaTopic + "-outputFormat" + _outputFormat + " -millisBetweenMessages " + _millisBetweenMessages;
   }
 
   @Override
@@ -111,12 +120,23 @@ public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand impleme
     try {
       // Open the Avro file
       DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(new File(_avroFile));
-
+      DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(reader.getSchema());
       // Iterate over every record
       for (GenericRecord genericRecord : reader) {
+        byte[] bytes;
+        switch (_outputFormat) {
+          case "avro":
+            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+            BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
+            datumWriter.write(genericRecord, encoder);
+            encoder.flush();
+            bytes = outputStream.toByteArray();
+            break;
+          default:
+            String recordJson = genericRecord.toString();
+            bytes = recordJson.getBytes("utf-8");
+        }
         // Write the message to Kafka
-        String recordJson = genericRecord.toString();
-        byte[] bytes = recordJson.getBytes("utf-8");
         streamDataProducer.produce(_kafkaTopic, Longs.toByteArray(HashUtil.hash64(bytes, bytes.length)), bytes);
 
         // Sleep between messages


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org