You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/08/05 09:08:09 UTC
[pulsar] branch master updated: [Pulsar SQL] Make Pulsar SQL get
correct offload configurations (#7701)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9f687d3 [Pulsar SQL] Make Pulsar SQL get correct offload configurations (#7701)
9f687d3 is described below
commit 9f687d3f67d7534cb9ee77ddd2c7d0f24d34d0b4
Author: ran <ga...@126.com>
AuthorDate: Wed Aug 5 17:07:52 2020 +0800
[Pulsar SQL] Make Pulsar SQL get correct offload configurations (#7701)
### Motivation
Currently, Pulsar SQL can't get the correct offload configurations.
### Modifications
Make Pulsar SQL get the complete offload configurations.
### Verifying this change
Add a new integration test.
---
.github/workflows/ci-integration-sql.yaml | 9 ++
conf/presto/config.properties | 2 +
.../common/policies/data/OffloadPolicies.java | 5 +-
.../pulsar/sql/presto/PulsarConnectorCache.java | 6 +-
.../pulsar/sql/presto/PulsarConnectorConfig.java | 12 ++
.../sql/presto/TestPulsarConnectorConfig.java | 32 ++++
.../tests/integration/presto/TestBasicPresto.java | 1 +
...esto.java => TestPrestoQueryTieredStorage.java} | 173 +++++++++++++++++----
.../integration/topologies/PulsarCluster.java | 17 +-
9 files changed, 225 insertions(+), 32 deletions(-)
diff --git a/.github/workflows/ci-integration-sql.yaml b/.github/workflows/ci-integration-sql.yaml
index 500fd42..29f9710 100644
--- a/.github/workflows/ci-integration-sql.yaml
+++ b/.github/workflows/ci-integration-sql.yaml
@@ -73,6 +73,15 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: mvn clean install -DskipTests
+# Flaky Test: https://github.com/apache/pulsar/issues/7750
+# - name: build pulsar image
+# if: steps.docs.outputs.changed_only == 'no'
+# run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
+#
+# - name: build pulsar-all image
+# if: steps.docs.outputs.changed_only == 'no'
+# run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
+#
- name: build artifacts and docker pulsar latest test image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests
diff --git a/conf/presto/config.properties b/conf/presto/config.properties
index 9f17135..2ca62ae 100644
--- a/conf/presto/config.properties
+++ b/conf/presto/config.properties
@@ -38,5 +38,7 @@ query.client.timeout=5m
query.min-expire-age=30m
presto.version=testversion
+
distributed-joins-enabled=true
+
node-scheduler.include-coordinator=true
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index 4936923..f377802 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.common.policies.data;
import static org.apache.pulsar.common.util.FieldParser.value;
import com.google.common.base.MoreObjects;
+import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Objects;
@@ -32,7 +33,9 @@ import org.apache.commons.lang3.StringUtils;
* Definition of the offload policies.
*/
@Data
-public class OffloadPolicies {
+public class OffloadPolicies implements Serializable {
+
+ private final static long serialVersionUID = 0L;
public final static int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 1024; // 64MB
public final static int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MB
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index f4a7d74..091efe1 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -40,7 +40,6 @@ import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.OffloaderUtils;
import org.apache.bookkeeper.mledger.offload.Offloaders;
import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -81,9 +80,8 @@ public class PulsarConnectorCache {
this.statsProvider.start(clientConfiguration);
- OffloadPolicies offloadPolicies = new OffloadPolicies();
- BeanUtils.copyProperties(offloadPolicies, pulsarConnectorConfig);
- this.defaultOffloader = initManagedLedgerOffloader(offloadPolicies, pulsarConnectorConfig);
+ this.defaultOffloader = initManagedLedgerOffloader(
+ pulsarConnectorConfig.getOffloadPolices(), pulsarConnectorConfig);
}
public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index 49d2ae3..0a172d9 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -23,6 +23,7 @@ import io.airlift.configuration.Config;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import java.util.regex.Matcher;
import javax.validation.constraints.NotNull;
import org.apache.bookkeeper.stats.NullStatsProvider;
@@ -31,6 +32,7 @@ import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.protocol.Commands;
/**
@@ -399,6 +401,16 @@ public class PulsarConnectorConfig implements AutoCloseable {
return this.pulsarAdmin;
}
+ public OffloadPolicies getOffloadPolices() {
+ Properties offloadProperties = new Properties();
+ offloadProperties.putAll(getOffloaderProperties());
+ OffloadPolicies offloadPolicies = OffloadPolicies.create(offloadProperties);
+ offloadPolicies.setManagedLedgerOffloadDriver(getManagedLedgerOffloadDriver());
+ offloadPolicies.setManagedLedgerOffloadMaxThreads(getManagedLedgerOffloadMaxThreads());
+ offloadPolicies.setOffloadersDirectory(getOffloadersDirectory());
+ return offloadPolicies;
+ }
+
@Override
public void close() throws Exception {
this.pulsarAdmin.close();
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java
index faf2bbc..f3d2f7a 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.sql.presto;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -68,4 +69,35 @@ public class TestPulsarConnectorConfig {
Assert.assertEquals(availableProcessors, connectorConfig.getManagedLedgerNumSchedulerThreads());
}
+ @Test
+ public void testGetOffloadPolices() throws Exception {
+ PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig();
+
+ final String managedLedgerOffloadDriver = "s3";
+ final String offloaderDirectory = "/pulsar/offloaders";
+ final int managedLedgerOffloadMaxThreads = 5;
+ final String bucket = "offload-bucket";
+ final String region = "us-west-2";
+ final String endpoint = "http://s3.amazonaws.com";
+ final String offloadProperties = "{"
+ + "\"s3ManagedLedgerOffloadBucket\":\"" + bucket + "\","
+ + "\"s3ManagedLedgerOffloadRegion\":\"" + region + "\","
+ + "\"s3ManagedLedgerOffloadServiceEndpoint\":\"" + endpoint + "\""
+ + "}";
+
+ connectorConfig.setManagedLedgerOffloadDriver(managedLedgerOffloadDriver);
+ connectorConfig.setOffloadersDirectory(offloaderDirectory);
+ connectorConfig.setManagedLedgerOffloadMaxThreads(managedLedgerOffloadMaxThreads);
+ connectorConfig.setOffloaderProperties(offloadProperties);
+
+ OffloadPolicies offloadPolicies = connectorConfig.getOffloadPolices();
+ Assert.assertNotNull(offloadPolicies);
+ Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDriver(), managedLedgerOffloadDriver);
+ Assert.assertEquals(offloadPolicies.getOffloadersDirectory(), offloaderDirectory);
+ Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadMaxThreads(), managedLedgerOffloadMaxThreads);
+ Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadBucket(), bucket);
+ Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadRegion(), region);
+ Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadServiceEndpoint(), endpoint);
+ }
+
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index 093f3eb..e29d945 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -50,6 +50,7 @@ public class TestBasicPresto extends PulsarTestSuite {
@BeforeClass
public void setupPresto() throws Exception {
+ log.info("[setupPresto]");
pulsarCluster.startPrestoWorker();
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
similarity index 53%
copy from tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
copy to tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
index 093f3eb..8ba48aa 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
@@ -20,17 +20,24 @@ package org.apache.pulsar.tests.integration.presto;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.tests.integration.containers.S3Container;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testcontainers.shaded.org.apache.commons.lang.StringUtils;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -38,38 +45,83 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Timestamp;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import static com.google.common.base.Preconditions.checkNotNull;
import static org.assertj.core.api.Assertions.assertThat;
@Slf4j
-public class TestBasicPresto extends PulsarTestSuite {
+public class TestPrestoQueryTieredStorage extends PulsarTestSuite {
- private static final int NUM_OF_STOCKS = 10;
+ private final static int ENTRIES_PER_LEDGER = 1024;
+ private final static String OFFLOAD_DRIVER = "s3";
+ private final static String BUCKET = "pulsar-integtest";
+ private final static String ENDPOINT = "http://" + S3Container.NAME + ":9090";
+
+ private S3Container s3Container;
+
+ @Override
+ protected void beforeStartCluster() throws Exception {
+ for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) {
+ getEnv().forEach(brokerContainer::withEnv);
+ }
+ }
@BeforeClass
public void setupPresto() throws Exception {
- pulsarCluster.startPrestoWorker();
+ s3Container = new S3Container(
+ pulsarCluster.getClusterName(),
+ S3Container.NAME)
+ .withNetwork(pulsarCluster.getNetwork())
+ .withNetworkAliases(S3Container.NAME);
+ s3Container.start();
+
+ log.info("[setupPresto] prestoWorker: " + pulsarCluster.getPrestoWorkerContainer());
+ pulsarCluster.startPrestoWorker(OFFLOAD_DRIVER, getOffloadProperties(BUCKET, null, ENDPOINT));
}
+ public String getOffloadProperties(String bucket, String region, String endpoint) {
+ checkNotNull(bucket);
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+ sb.append("\"s3ManagedLedgerOffloadBucket\":").append("\"").append(bucket).append("\",");
+ if (StringUtils.isNotEmpty(region)) {
+ sb.append("\"s3ManagedLedgerOffloadRegion\":").append("\"").append(region).append("\",");
+ }
+ if (StringUtils.isNotEmpty(endpoint)) {
+ sb.append("\"s3ManagedLedgerOffloadServiceEndpoint\":").append("\"").append(endpoint).append("\"");
+ }
+ sb.append("}");
+ return sb.toString();
+ }
+
+
@AfterClass
public void teardownPresto() {
log.info("tearing down...");
+ if (null != s3Container) {
+ s3Container.stop();
+ }
+
pulsarCluster.stopPrestoWorker();
}
- @Test
- public void testSimpleSQLQueryBatched() throws Exception {
- testSimpleSQLQuery(true);
+ // Flaky Test: https://github.com/apache/pulsar/issues/7750
+ // @Test
+ public void testQueryTieredStorage1() throws Exception {
+ testSimpleSQLQuery(false);
}
- @Test
- public void testSimpleSQLQueryNonBatched() throws Exception {
- testSimpleSQLQuery(false);
+ // Flaky Test: https://github.com/apache/pulsar/issues/7750
+ // @Test
+ public void testQueryTieredStorage2() throws Exception {
+ testSimpleSQLQuery(true);
}
-
- public void testSimpleSQLQuery(boolean isBatched) throws Exception {
+
+ public void testSimpleSQLQuery(boolean isNamespaceOffload) throws Exception {
// wait until presto worker started
ContainerExecResult result;
@@ -93,42 +145,51 @@ public class TestBasicPresto extends PulsarTestSuite {
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
- String stocksTopic;
- if (isBatched) {
- stocksTopic = "stocks_batched";
- } else {
- stocksTopic = "stocks_nonbatched";
- }
+ String stocksTopic = "stocks-" + randomName(5);
@Cleanup
Producer<Stock> producer = pulsarClient.newProducer(JSONSchema.of(Stock.class))
.topic(stocksTopic)
- .enableBatching(isBatched)
.create();
- for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
- final Stock stock = new Stock(i,"STOCK_" + i , 100.0 + i * 10);
- producer.send(stock);
+ long firstLedgerId = -1;
+ long currentLedgerId = -1;
+ int sendMessageCnt = 0;
+ while (currentLedgerId <= firstLedgerId) {
+ sendMessageCnt ++;
+ final Stock stock = new Stock(sendMessageCnt,"STOCK_" + sendMessageCnt , 100.0 + sendMessageCnt * 10);
+ MessageIdImpl messageId = (MessageIdImpl) producer.send(stock);
+ if (firstLedgerId == -1) {
+ firstLedgerId = messageId.getLedgerId();
+ }
+ currentLedgerId = messageId.getLedgerId();
+ log.info("firstLedgerId: {}, currentLedgerId: {}", firstLedgerId, currentLedgerId);
+ Thread.sleep(100);
}
producer.flush();
+ offloadAndDeleteFromBK(isNamespaceOffload, stocksTopic);
+
+ // check schema
result = execQuery("show schemas in pulsar;");
assertThat(result.getExitCode()).isEqualTo(0);
assertThat(result.getStdout()).contains("public/default");
+ // check table
result = execQuery("show tables in pulsar.\"public/default\";");
assertThat(result.getExitCode()).isEqualTo(0);
- assertThat(result.getStdout()).contains("stocks");
+ assertThat(result.getStdout()).contains(stocksTopic);
+ // check query
ContainerExecResult containerExecResult = execQuery(String.format("select * from pulsar.\"public/default\".%s order by entryid;", stocksTopic));
assertThat(containerExecResult.getExitCode()).isEqualTo(0);
log.info("select sql query output \n{}", containerExecResult.getStdout());
String[] split = containerExecResult.getStdout().split("\n");
- assertThat(split.length).isGreaterThan(NUM_OF_STOCKS - 2);
+ assertThat(split.length).isGreaterThan(sendMessageCnt - 2);
String[] split2 = containerExecResult.getStdout().split("\n|,");
- for (int i = 0; i < NUM_OF_STOCKS - 2; ++i) {
+ for (int i = 0; i < sendMessageCnt - 2; ++i) {
assertThat(split2).contains("\"" + i + "\"");
assertThat(split2).contains("\"" + "STOCK_" + i + "\"");
assertThat(split2).contains("\"" + (100.0 + i * 10) + "\"");
@@ -150,7 +211,7 @@ public class TestBasicPresto extends PulsarTestSuite {
timestamps.add(res.getTimestamp("__publish_time__"));
}
- assertThat(timestamps.size()).isGreaterThan(NUM_OF_STOCKS - 2);
+ assertThat(timestamps.size()).isGreaterThan(sendMessageCnt - 2);
query = String.format("select * from pulsar" +
".\"public/default\".%s where __publish_time__ > timestamp '%s' order by __publish_time__", stocksTopic, timestamps.get(timestamps.size() / 2));
@@ -225,4 +286,64 @@ public class TestBasicPresto extends PulsarTestSuite {
}
+ private void offloadAndDeleteFromBK(boolean isNamespaceOffload, String stocksTopic) {
+ String adminUrl = pulsarCluster.getHttpServiceUrl();
+ try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
+ // read managed ledger info, check ledgers exist
+ long firstLedger = admin.topics().getInternalStats(stocksTopic).ledgers.get(0).ledgerId;
+
+ String output = "";
+
+ if (isNamespaceOffload) {
+ pulsarCluster.runAdminCommandOnAnyBroker(
+ "namespaces", "set-offload-policies",
+ "--bucket", "pulsar-integtest",
+ "--driver", "s3",
+ "--endpoint", "http://" + S3Container.NAME + ":9090",
+ "--offloadAfterElapsed", "1000",
+ "public/default");
+
+ output = pulsarCluster.runAdminCommandOnAnyBroker(
+ "namespaces", "get-offload-policies").getStdout();
+ Assert.assertTrue(output.contains("pulsar-integtest"));
+ Assert.assertTrue(output.contains("s3"));
+ }
+
+ // offload with a low threshold
+ output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
+ "offload", "--size-threshold", "1M", stocksTopic).getStdout();
+ Assert.assertTrue(output.contains("Offload triggered"));
+
+ output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
+ "offload-status", "-w", stocksTopic).getStdout();
+ Assert.assertTrue(output.contains("Offload was a success"));
+
+ // delete the first ledger, so that we cannot possibly read from it
+ ClientConfiguration bkConf = new ClientConfiguration();
+ bkConf.setZkServers(pulsarCluster.getZKConnString());
+ try (BookKeeper bk = new BookKeeper(bkConf)) {
+ bk.deleteLedger(firstLedger);
+ } catch (Exception e) {
+ log.error("Failed to delete from BookKeeper.", e);
+ Assert.fail("Failed to delete from BookKeeper.");
+ }
+
+ // Unload topic to clear all caches, open handles, etc
+ admin.topics().unload(stocksTopic);
+ } catch (Exception e) {
+ Assert.fail("Failed to deleteOffloadedDataFromBK.");
+ }
+ }
+
+ protected Map<String, String> getEnv() {
+ Map<String, String> result = new HashMap<>();
+ result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER));
+ result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
+ result.put("managedLedgerOffloadDriver", OFFLOAD_DRIVER);
+ result.put("s3ManagedLedgerOffloadBucket", BUCKET);
+ result.put("s3ManagedLedgerOffloadServiceEndpoint", ENDPOINT);
+
+ return result;
+ }
+
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 6ffd38a..36175a6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -328,6 +328,11 @@ public class PulsarCluster {
}
public void startPrestoWorker() {
+ startPrestoWorker(null, null);
+ }
+
+ public void startPrestoWorker(String offloadDriver, String offloadProperties) {
+ log.info("[startPrestoWorker] offloadDriver: {}, offloadProperties: {}", offloadDriver, offloadProperties);
if (null == prestoWorkerContainer) {
prestoWorkerContainer = new PrestoWorkerContainer(clusterName, PrestoWorkerContainer.NAME)
.withNetwork(network)
@@ -337,8 +342,18 @@ public class PulsarCluster {
.withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
.withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
.withEnv("pulsar.broker-service-url", "http://pulsar-broker-0:8080");
+ if (offloadDriver != null && offloadProperties != null) {
+ log.info("[startPrestoWorker] set offload env offloadDriver: {}, offloadProperties: {}",
+ offloadDriver, offloadProperties);
+ prestoWorkerContainer.withEnv("PULSAR_PREFIX_pulsar.managed-ledger-offload-driver", offloadDriver);
+ prestoWorkerContainer.withEnv("PULSAR_PREFIX_pulsar.offloader-properties", offloadProperties);
+ prestoWorkerContainer.withEnv("PULSAR_PREFIX_pulsar.offloaders-directory", "/pulsar/offloaders");
+ // used in s3 tests
+ prestoWorkerContainer.withEnv("AWS_ACCESS_KEY_ID", "accesskey");
+ prestoWorkerContainer.withEnv("AWS_SECRET_KEY", "secretkey");
+ }
}
- log.info("Starting Presto Worker");
+ log.info("[startPrestoWorker] Starting Presto Worker");
prestoWorkerContainer.start();
}