You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2019/11/19 21:37:05 UTC
[incubator-pinot] 01/01: Make StreamAvroIntoKafkaCommand also
supported dump data in avro format into Kafka
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch stream_avro_into_kafka_in_avro
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit b4e84d7091949081146bbd0d2c1a5ae4b7f0bc65
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Tue Nov 19 13:32:44 2019 -0800
Make StreamAvroIntoKafkaCommand also supported dump data in avro format into Kafka
---
.../admin/command/StreamAvroIntoKafkaCommand.java | 28 ++++++++++++++++++----
1 file changed, 24 insertions(+), 4 deletions(-)
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
index 66ae944..495f2e7 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
@@ -20,12 +20,17 @@ package org.apache.pinot.tools.admin.command;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
import org.apache.pinot.core.util.AvroUtils;
@@ -57,9 +62,13 @@ public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand impleme
@Option(name = "-zkAddress", required = false, metaVar = "<string>", usage = "Address of Zookeeper.")
private String _zkAddress = "localhost:2181";
+ @Option(name = "-outputFormat", required = false, metaVar = "<string>", usage = "Data format to produce to Kafka, supported: json(default) and avro")
+ private String _outputFormat = "json";
+
@Option(name = "-millisBetweenMessages", required = false, metaVar = "<int>", usage = "Delay in milliseconds between messages (default 1000 ms)")
private String _millisBetweenMessages = "1000";
+
@Override
public boolean getHelp() {
return _help;
@@ -73,7 +82,7 @@ public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand impleme
@Override
public String toString() {
return "StreamAvroInfoKafka -avroFile " + _avroFile + " -kafkaBrokerList " + _kafkaBrokerList + " -kafkaTopic "
- + _kafkaTopic + " -millisBetweenMessages " + _millisBetweenMessages;
+ + _kafkaTopic + "-outputFormat" + _outputFormat + " -millisBetweenMessages " + _millisBetweenMessages;
}
@Override
@@ -111,12 +120,23 @@ public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand impleme
try {
// Open the Avro file
DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(new File(_avroFile));
-
+ DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(reader.getSchema());
// Iterate over every record
for (GenericRecord genericRecord : reader) {
+ byte[] bytes;
+ switch (_outputFormat) {
+ case "avro":
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
+ datumWriter.write(genericRecord, encoder);
+ encoder.flush();
+ bytes = outputStream.toByteArray();
+ break;
+ default:
+ String recordJson = genericRecord.toString();
+ bytes = recordJson.getBytes("utf-8");
+ }
// Write the message to Kafka
- String recordJson = genericRecord.toString();
- byte[] bytes = recordJson.getBytes("utf-8");
streamDataProducer.produce(_kafkaTopic, Longs.toByteArray(HashUtil.hash64(bytes, bytes.length)), bytes);
// Sleep between messages
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org