You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/13 21:50:06 UTC
[incubator-pulsar] branch master updated: optimizing throughput in
Pulsar Presto connector (#2564)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6ef7aca optimizing throughput in Pulsar Presto connector (#2564)
6ef7aca is described below
commit 6ef7acaf37d57769c4d9dbf7558ef627ce061339
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu Sep 13 14:50:01 2018 -0700
optimizing throughput in Pulsar Presto connector (#2564)
### Motivation
1. Currently, the presto pulsar connector will read synchronously from bookkeeper when it has run out of entries go process. Basically, we process a batch of entries and then we read more. Ideally should be doing reading and processing in parallel to increase throughput.
2. Each split initializes their own ManagedLedgerFactory/Bookkeeper client. We really just need one bookkeeper client to be shared among threads.
### Modifications
1. Rewrote the logic in the Presto Pulsar connector to read async and process in parallel
2. Cache ManagedLedgerFactory to be used across splits
### Result
I see about 2X throughput improvement on single node as well as cluster (2 brokers, 3 bookies, 4 presto workers including coordinator) on AWS
---
conf/presto/catalog/pulsar.properties | 6 +-
.../apache/pulsar/sql/presto/PulsarConnector.java | 5 +
.../pulsar/sql/presto/PulsarConnectorCache.java | 64 +++++++
.../pulsar/sql/presto/PulsarConnectorConfig.java | 26 ++-
.../pulsar/sql/presto/PulsarRecordCursor.java | 201 +++++++++++++++------
.../pulsar/sql/presto/TestPulsarConnector.java | 146 ++++++++-------
.../pulsar/sql/presto/TestPulsarRecordCursor.java | 1 +
7 files changed, 326 insertions(+), 123 deletions(-)
diff --git a/conf/presto/catalog/pulsar.properties b/conf/presto/catalog/pulsar.properties
index 23b945e..5f922e5 100644
--- a/conf/presto/catalog/pulsar.properties
+++ b/conf/presto/catalog/pulsar.properties
@@ -26,4 +26,8 @@ pulsar.zookeeper-uri=localhost:2181
# minimum number of entries to read at a single time
pulsar.entry-read-batch-size=100
# default number of splits to use per query
-pulsar.target-num-splits=4
+pulsar.target-num-splits=2
+# max message queue size
+pulsar.max-split-message-queue-size=10000
+# max entry queue size
+pulsar.max-split-entry-queue-size = 1000
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
index 1d89b51..498583d 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
@@ -87,6 +87,11 @@ public class PulsarConnector implements Connector {
log.error(e, "Failed to close pulsar connector");
}
try {
+ PulsarConnectorCache.shutdown();
+ } catch (Exception e) {
+ log.error("Failed to shutdown pulsar connector cache");
+ }
+ try {
lifeCycleManager.stop();
} catch (Exception e) {
log.error(e, "Error shutting down connector");
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
new file mode 100644
index 0000000..d13ddcd
--- /dev/null
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
+
+public class PulsarConnectorCache {
+
+ private static PulsarConnectorCache instance;
+
+ private final ManagedLedgerFactory managedLedgerFactory;
+
+ private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
+ this.managedLedgerFactory = initManagedLedgerFactory(pulsarConnectorConfig);
+ }
+
+ public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
+ synchronized (PulsarConnectorCache.class) {
+ if (instance == null) {
+ instance = new PulsarConnectorCache(pulsarConnectorConfig);
+ }
+ }
+ return instance;
+ }
+
+ private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
+ ClientConfiguration bkClientConfiguration = new ClientConfiguration()
+ .setZkServers(pulsarConnectorConfig.getZookeeperUri())
+ .setAllowShadedLedgerManagerFactoryClass(true)
+ .setShadedLedgerManagerFactoryClassPrefix("org.apache.pulsar.shade.")
+ .setReadEntryTimeout(60);
+ return new ManagedLedgerFactoryImpl(bkClientConfiguration);
+ }
+
+ public ManagedLedgerFactory getManagedLedgerFactory() {
+ return managedLedgerFactory;
+ }
+
+ public static void shutdown() throws ManagedLedgerException, InterruptedException {
+ if (instance != null) {
+ instance.managedLedgerFactory.shutdown();
+ instance = null;
+ }
+ }
+}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index 1f574c6..482fab3 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -29,7 +29,9 @@ public class PulsarConnectorConfig implements AutoCloseable {
private String brokerServiceUrl = "http://localhost:8080";
private String zookeeperUri = "localhost:2181";
private int entryReadBatchSize = 100;
- private int targetNumSplits = 4;
+ private int targetNumSplits = 2;
+ private int maxSplitMessageQueueSize = 10000;
+ private int maxSplitEntryQueueSize = 1000;
private PulsarAdmin pulsarAdmin;
@NotNull
@@ -77,6 +79,28 @@ public class PulsarConnectorConfig implements AutoCloseable {
}
@NotNull
+ public int getMaxSplitMessageQueueSize() {
+ return this.maxSplitMessageQueueSize;
+ }
+
+ @Config("pulsar.max-split-message-queue-size")
+ public PulsarConnectorConfig setMaxSplitMessageQueueSize(int maxSplitMessageQueueSize) {
+ this.maxSplitMessageQueueSize = maxSplitMessageQueueSize;
+ return this;
+ }
+
+ @NotNull
+ public int getMaxSplitEntryQueueSize() {
+ return this.maxSplitEntryQueueSize;
+ }
+
+ @Config("pulsar.max-split-entry-queue-size")
+ public PulsarConnectorConfig setMaxSplitEntryQueueSize(int maxSplitEntryQueueSize) {
+ this.maxSplitEntryQueueSize = maxSplitEntryQueueSize;
+ return this;
+ }
+
+ @NotNull
public PulsarAdmin getPulsarAdmin() throws PulsarClientException {
if (this.pulsarAdmin == null) {
this.pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(getBrokerServiceUrl()).build();
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index ef56e6c..c8106aa 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -28,6 +28,7 @@ import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.apache.avro.Schema;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -44,10 +45,11 @@ import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
import java.io.IOException;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
@@ -62,35 +64,47 @@ import static com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_W
import static com.facebook.presto.spi.type.TinyintType.TINYINT;
import static com.google.common.base.Preconditions.checkArgument;
+
public class PulsarRecordCursor implements RecordCursor {
private List<PulsarColumnHandle> columnHandles;
private PulsarSplit pulsarSplit;
private PulsarConnectorConfig pulsarConnectorConfig;
- private ManagedLedgerFactory managedLedgerFactory;
private ReadOnlyCursor cursor;
- private Queue<Message> messageQueue = new LinkedList<>();
+ private ArrayBlockingQueue<Message> messageQueue;
+ private ArrayBlockingQueue<Entry> entryQueue;
private Object currentRecord;
private Message currentMessage;
private Map<String, PulsarInternalColumn> internalColumnMap = PulsarInternalColumn.getInternalFieldsMap();
private SchemaHandler schemaHandler;
private int batchSize;
- private long completedBytes = 0L;
+ private AtomicLong completedBytes = new AtomicLong(0L);
+ private ReadEntries readEntries;
+ private DeserializeEntries deserializeEntries;
+ private TopicName topicName;
private static final Logger log = Logger.get(PulsarRecordCursor.class);
+ private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
+ ClientConfiguration bkClientConfiguration = new ClientConfiguration()
+ .setZkServers(pulsarConnectorConfig.getZookeeperUri())
+ .setAllowShadedLedgerManagerFactoryClass(true)
+ .setShadedLedgerManagerFactoryClassPrefix("org.apache.pulsar.shade.");
+ return new ManagedLedgerFactoryImpl(bkClientConfiguration);
+ }
+
public PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit,
PulsarConnectorConfig pulsarConnectorConfig) {
-
- ManagedLedgerFactory managedLedgerFactory;
+ PulsarConnectorCache pulsarConnectorCache;
try {
- managedLedgerFactory = getManagedLedgerFactory(pulsarConnectorConfig);
+ pulsarConnectorCache = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig);
} catch (Exception e) {
- log.error(e, "Failed to initialize managed ledger factory");
+ log.error(e, "Failed to initialize Pulsar connector cache");
close();
throw new RuntimeException(e);
}
- initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory);
+ initialize(columnHandles, pulsarSplit, pulsarConnectorConfig,
+ pulsarConnectorCache.getManagedLedgerFactory());
}
// Exposed for testing purposes
@@ -105,7 +119,11 @@ public class PulsarRecordCursor implements RecordCursor {
this.pulsarSplit = pulsarSplit;
this.pulsarConnectorConfig = pulsarConnectorConfig;
this.batchSize = pulsarConnectorConfig.getEntryReadBatchSize();
- this.managedLedgerFactory = managedLedgerFactory;
+ this.messageQueue = new ArrayBlockingQueue<>(pulsarConnectorConfig.getMaxSplitMessageQueueSize());
+ this.entryQueue = new ArrayBlockingQueue<>(pulsarConnectorConfig.getMaxSplitEntryQueueSize());
+ this.topicName = TopicName.get("persistent",
+ NamespaceName.get(pulsarSplit.getSchemaName()),
+ pulsarSplit.getTableName());
Schema schema = PulsarConnectorUtils.parseSchema(pulsarSplit.getSchema());
@@ -149,18 +167,9 @@ public class PulsarRecordCursor implements RecordCursor {
return cursor;
}
- private ManagedLedgerFactory getManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
- ClientConfiguration bkClientConfiguration = new ClientConfiguration()
- .setZkServers(pulsarConnectorConfig.getZookeeperUri())
- .setAllowShadedLedgerManagerFactoryClass(true)
- .setShadedLedgerManagerFactoryClassPrefix("org.apache.pulsar.shade.");
- return new ManagedLedgerFactoryImpl(bkClientConfiguration);
- }
-
-
@Override
public long getCompletedBytes() {
- return this.completedBytes;
+ return this.completedBytes.get();
}
@Override
@@ -174,39 +183,48 @@ public class PulsarRecordCursor implements RecordCursor {
return columnHandles.get(field).getType();
}
- @Override
- public boolean advanceNextPosition() {
+ @VisibleForTesting
+ class DeserializeEntries implements Runnable {
- if (this.messageQueue.isEmpty()) {
- if (!this.cursor.hasMoreEntries()) {
- return false;
- }
- if (((PositionImpl) this.cursor.getReadPosition())
- .compareTo(this.pulsarSplit.getEndPosition()) >= 0) {
- return false;
- }
+ protected AtomicBoolean isRunning = new AtomicBoolean(false);
- TopicName topicName = TopicName.get("persistent",
- NamespaceName.get(this.pulsarSplit.getSchemaName()),
- this.pulsarSplit.getTableName());
+ private final Thread thread;
- List<Entry> newEntries;
- try {
- newEntries = this.cursor.readEntries(this.batchSize);
- } catch (InterruptedException | ManagedLedgerException e) {
- log.error(e, "Failed to read new entries from pulsar topic %s", topicName.toString());
- throw new RuntimeException(e);
- }
+ public DeserializeEntries() {
+ this.thread = new Thread(this);
+ }
+
+ public void interrupt() {
+ isRunning.set(false);
+ thread.interrupt();
+ }
+
+ public void start() {
+ this.thread.start();
+ }
- newEntries.forEach(entry -> {
+ @Override
+ public void run() {
+ isRunning.set(true);
+ while (isRunning.get()) {
+ Entry entry;
try {
- completedBytes += entry.getDataBuffer().readableBytes();
+ entry = entryQueue.take();
+ } catch (InterruptedException e) {
+ break;
+ }
+ try {
+ completedBytes.addAndGet(entry.getDataBuffer().readableBytes());
// filter entries that is not part of my split
if (((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) < 0) {
try {
MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(),
entry.getDataBuffer(), (messageId, message, byteBuf) -> {
- messageQueue.add(message);
+ try {
+ messageQueue.put(message);
+ } catch (InterruptedException e) {
+ //no-op
+ }
});
} catch (IOException e) {
log.error(e, "Failed to parse message from pulsar topic %s", topicName.toString());
@@ -216,15 +234,92 @@ public class PulsarRecordCursor implements RecordCursor {
} finally {
entry.release();
}
- });
+ }
}
+ }
- this.currentMessage = this.messageQueue.poll();
- currentRecord = this.schemaHandler.deserialize(this.currentMessage.getData());
+ @VisibleForTesting
+ class ReadEntries implements AsyncCallbacks.ReadEntriesCallback {
+
+ // indicate whether there are any additional entries left to read
+ private final AtomicBoolean isDone = new AtomicBoolean(false);
+
+ //num of outstanding read requests
+ // set to 1 because we can only read one batch a time
+ private final AtomicLong outstandingReadsRequests = new AtomicLong(1);
+
+ public void run() {
+
+ if (outstandingReadsRequests.get() > 0) {
+ if (!cursor.hasMoreEntries() || ((PositionImpl) cursor.getReadPosition())
+ .compareTo(pulsarSplit.getEndPosition()) >= 0) {
+ isDone.set(true);
+
+ } else if (entryQueue.remainingCapacity() > batchSize) {
+ outstandingReadsRequests.decrementAndGet();
+ cursor.asyncReadEntries(batchSize, this, System.currentTimeMillis());
+ }
+ }
+ }
+
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+ entryQueue.addAll(entries);
+ outstandingReadsRequests.incrementAndGet();
+ }
+
+ public boolean hashFinished() {
+ return messageQueue.isEmpty() && entryQueue.isEmpty() && isDone.get() && outstandingReadsRequests.get() >=1;
+ }
+
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+ log.debug(exception, "Failed to read entries from topic %s", topicName.toString());
+ outstandingReadsRequests.incrementAndGet();
+ }
+ }
+
+
+ @Override
+ public boolean advanceNextPosition() {
+
+ if (readEntries == null) {
+ readEntries = new ReadEntries();
+ readEntries.run();
+
+ // start deserialize thread
+ deserializeEntries = new DeserializeEntries();
+ deserializeEntries.start();
+ }
+
+ while(true) {
+ if (readEntries.hashFinished()) {
+ return false;
+ }
+
+ if (messageQueue.remainingCapacity() > 0) {
+ readEntries.run();
+ }
+
+ currentMessage = messageQueue.poll();
+ if (currentMessage != null) {
+ break;
+ } else {
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ currentRecord = this.schemaHandler.deserialize(this.currentMessage.getData());
return true;
}
+
@VisibleForTesting
Object getRecord(int fieldIndex) {
if (this.currentRecord == null) {
@@ -317,17 +412,13 @@ public class PulsarRecordCursor implements RecordCursor {
@Override
public void close() {
- if (this.cursor != null) {
- try {
- this.cursor.close();
- } catch (Exception e) {
- log.error(e);
- }
+ if (deserializeEntries != null) {
+ deserializeEntries.interrupt();
}
- if (managedLedgerFactory != null) {
+ if (this.cursor != null) {
try {
- managedLedgerFactory.shutdown();
+ this.cursor.close();
} catch (Exception e) {
log.error(e);
}
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 0882efc..5d8472d 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -27,6 +27,7 @@ import com.facebook.presto.spi.type.RealType;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarcharType;
import io.airlift.log.Logger;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
@@ -68,6 +69,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -662,6 +664,9 @@ public abstract class TestPulsarConnector {
@BeforeMethod
public void setup() throws Exception {
this.pulsarConnectorConfig = spy(new PulsarConnectorConfig());
+ this.pulsarConnectorConfig.setEntryReadBatchSize(1);
+ this.pulsarConnectorConfig.setMaxSplitEntryQueueSize(10);
+ this.pulsarConnectorConfig.setMaxSplitMessageQueueSize(100);
Tenants tenants = mock(Tenants.class);
doReturn(new LinkedList<>(topicNames.stream().map(new Function<TopicName, String>() {
@@ -786,77 +791,86 @@ public abstract class TestPulsarConnector {
}
});
- when(readOnlyCursor.readEntries(anyInt())).thenAnswer(new Answer<List<Entry>>() {
+ doAnswer(new Answer() {
@Override
- public List<Entry> answer(InvocationOnMock invocationOnMock) throws Throwable {
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
Object[] args = invocationOnMock.getArguments();
Integer readEntries = (Integer) args[0];
+ AsyncCallbacks.ReadEntriesCallback callback = (AsyncCallbacks.ReadEntriesCallback) args[1];
+ Object ctx = args[2];
+
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ List < Entry > entries = new LinkedList<>();
+ for (int i = 0; i < readEntries; i++) {
+
+ Foo.Bar foobar = new Foo.Bar();
+ foobar.field1 = (int) fooFunctions.get("bar.test.foobar.field1").apply(count);
+
+ Boo boo1 = new Boo();
+ boo1.field4 = (double) fooFunctions.get("bar.test.field4").apply(count);
+ boo1.field5 = (boolean) fooFunctions.get("bar.test.field5").apply(count);
+ boo1.field6 = (long) fooFunctions.get("bar.test.field6").apply(count);
+ boo1.foo = new Foo();
+ boo1.boo = null;
+ boo1.bar = new Bar();
+ boo1.foobar = foobar;
+
+ Boo boo2 = new Boo();
+ boo2.field4 = (double) fooFunctions.get("bar.test2.field4").apply(count);
+ boo2.field5 = (boolean) fooFunctions.get("bar.test2.field5").apply(count);
+ boo2.field6 = (long) fooFunctions.get("bar.test2.field6").apply(count);
+ boo2.foo = new Foo();
+ boo2.boo = boo1;
+ boo2.bar = new Bar();
+ boo2.foobar = foobar;
+
+ TestPulsarConnector.Bar bar = new TestPulsarConnector.Bar();
+ bar.field1 = fooFunctions.get("bar.field1").apply(count) == null ? null : (int) fooFunctions.get("bar.field1").apply(count);
+ bar.field2 = fooFunctions.get("bar.field2").apply(count) == null ? null : (String) fooFunctions.get("bar.field2").apply(count);
+ bar.field3 = (float) fooFunctions.get("bar.field3").apply(count);
+ bar.test = boo1;
+ bar.test2 = count % 2 == 0 ? null : boo2;
+
+ Foo foo = new Foo();
+ foo.field1 = (int) fooFunctions.get("field1").apply(count);
+ foo.field2 = (String) fooFunctions.get("field2").apply(count);
+ foo.field3 = (float) fooFunctions.get("field3").apply(count);
+ foo.field4 = (double) fooFunctions.get("field4").apply(count);
+ foo.field5 = (boolean) fooFunctions.get("field5").apply(count);
+ foo.field6 = (long) fooFunctions.get("field6").apply(count);
+ foo.timestamp = (long) fooFunctions.get("timestamp").apply(count);
+ foo.time = (int) fooFunctions.get("time").apply(count);
+ foo.date = (int) fooFunctions.get("date").apply(count);
+ foo.bar = bar;
+
+ PulsarApi.MessageMetadata messageMetadata = PulsarApi.MessageMetadata.newBuilder()
+ .setProducerName("test-producer").setSequenceId(positions.get(topic))
+ .setPublishTime(System.currentTimeMillis()).build();
+
+ Schema schema = topicsToSchemas.get(schemaName).getType() == SchemaType.AVRO ? AvroSchema.of(Foo.class) : JSONSchema.of(Foo.class);
+
+ org.apache.pulsar.shade.io.netty.buffer.ByteBuf payload
+ = org.apache.pulsar.shade.io.netty.buffer.Unpooled.copiedBuffer(schema.encode(foo));
+
+ ByteBuf byteBuf = serializeMetadataAndPayload
+ (Commands.ChecksumType.Crc32c, messageMetadata, payload);
+
+ completedBytes += byteBuf.readableBytes();
+
+ entries.add(EntryImpl.create(0, positions.get(topic), byteBuf));
+ positions.put(topic, positions.get(topic) + 1);
+ count++;
+ }
+
+ callback.readEntriesComplete(entries, ctx);
+ }
+ }).start();
- List<Entry> entries = new LinkedList<>();
- for (int i = 0; i < readEntries; i++) {
-
- Foo.Bar foobar = new Foo.Bar();
- foobar.field1 = (int) fooFunctions.get("bar.test.foobar.field1").apply(count);
-
- Boo boo1 = new Boo();
- boo1.field4 = (double) fooFunctions.get("bar.test.field4").apply(count);
- boo1.field5 = (boolean) fooFunctions.get("bar.test.field5").apply(count);
- boo1.field6 = (long) fooFunctions.get("bar.test.field6").apply(count);
- boo1.foo = new Foo();
- boo1.boo = null;
- boo1.bar = new Bar();
- boo1.foobar = foobar;
-
- Boo boo2 = new Boo();
- boo2.field4 = (double) fooFunctions.get("bar.test2.field4").apply(count);
- boo2.field5 = (boolean) fooFunctions.get("bar.test2.field5").apply(count);
- boo2.field6 = (long) fooFunctions.get("bar.test2.field6").apply(count);
- boo2.foo = new Foo();
- boo2.boo = boo1;
- boo2.bar = new Bar();
- boo2.foobar = foobar;
-
- TestPulsarConnector.Bar bar = new TestPulsarConnector.Bar();
- bar.field1 = fooFunctions.get("bar.field1").apply(count) == null ? null : (int) fooFunctions.get("bar.field1").apply(count);
- bar.field2 = fooFunctions.get("bar.field2").apply(count) == null ? null : (String) fooFunctions.get("bar.field2").apply(count);
- bar.field3 = (float) fooFunctions.get("bar.field3").apply(count);
- bar.test = boo1;
- bar.test2 = count % 2 == 0 ? null : boo2;
-
- Foo foo = new Foo();
- foo.field1 = (int) fooFunctions.get("field1").apply(count);
- foo.field2 = (String) fooFunctions.get("field2").apply(count);
- foo.field3 = (float) fooFunctions.get("field3").apply(count);
- foo.field4 = (double) fooFunctions.get("field4").apply(count);
- foo.field5 = (boolean) fooFunctions.get("field5").apply(count);
- foo.field6 = (long) fooFunctions.get("field6").apply(count);
- foo.timestamp = (long) fooFunctions.get("timestamp").apply(count);
- foo.time = (int) fooFunctions.get("time").apply(count);
- foo.date = (int) fooFunctions.get("date").apply(count);
- foo.bar = bar;
-
- PulsarApi.MessageMetadata messageMetadata = PulsarApi.MessageMetadata.newBuilder()
- .setProducerName("test-producer").setSequenceId(positions.get(topic))
- .setPublishTime(System.currentTimeMillis()).build();
-
- Schema schema = topicsToSchemas.get(schemaName).getType() == SchemaType.AVRO ? AvroSchema.of(Foo.class) : JSONSchema.of(Foo.class);
-
- org.apache.pulsar.shade.io.netty.buffer.ByteBuf payload
- = org.apache.pulsar.shade.io.netty.buffer.Unpooled.copiedBuffer(schema.encode(foo));
-
- ByteBuf byteBuf = serializeMetadataAndPayload
- (Commands.ChecksumType.Crc32c, messageMetadata, payload);
-
- completedBytes += byteBuf.readableBytes();
-
- entries.add(EntryImpl.create(0, positions.get(topic), byteBuf));
- positions.put(topic, positions.get(topic) + 1);
- count++;
- }
-
- return entries;
+ return null;
}
- });
+ }).when(readOnlyCursor).asyncReadEntries(anyInt(), any(), any());
when(readOnlyCursor.hasMoreEntries()).thenAnswer(new Answer<Boolean>() {
@Override
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index 1b323a2..b0fc42a 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -122,6 +122,7 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
Assert.assertEquals(count, topicsToNumEntries.get(topicName.getSchemaName()).longValue());
Assert.assertEquals(pulsarRecordCursor.getCompletedBytes(), completedBytes);
cleanup();
+ pulsarRecordCursor.close();
}
}
}