You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2021/04/17 01:30:36 UTC
[pulsar] branch master updated: PIP-82 incorporate review feedback
(#10201)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1cc782f PIP-82 incorporate review feedback (#10201)
1cc782f is described below
commit 1cc782f88b8934e1c636d45d9f6fc25311ebe260
Author: Bharani Chadalavada <bh...@gmail.com>
AuthorDate: Fri Apr 16 18:29:27 2021 -0700
PIP-82 incorporate review feedback (#10201)
---
pom.xml | 2 +-
.../apache/pulsar/broker/ServiceConfiguration.java | 6 ---
pulsar-broker/pom.xml | 2 +
.../org/apache/pulsar/broker/PulsarService.java | 5 +++
.../ResourceUsageTransportManager.java | 44 ++++++++++++++++++----
.../ResourceUsageTransportManagerTest.java | 5 +--
6 files changed, 45 insertions(+), 19 deletions(-)
diff --git a/pom.xml b/pom.xml
index d7b5b72..209dcc1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -235,7 +235,7 @@ flexible messaging model and an intuitive client API.</description>
<errorprone.version>2.5.1</errorprone.version>
<errorprone.javac.version>9+181-r4173-1</errorprone.javac.version>
<errorprone-slf4j.version>0.1.4</errorprone-slf4j.version>
- <lightproto-maven-plugin.version>0.2</lightproto-maven-plugin.version>
+ <lightproto-maven-plugin.version>0.4</lightproto-maven-plugin.version>
<!-- Used to configure rename.netty.native. Libs -->
<rename.netty.native.libs>rename-netty-native-libs.sh</rename.netty.native.libs>
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 856f826..358345c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -688,12 +688,6 @@ public class ServiceConfiguration implements PulsarConfiguration {
private String resourceUsageTransportClassName = "";
@FieldContext(
- category = CATEGORY_POLICIES,
- doc = "Topic to publish usage reports to if resourceUsagePublishToTopic is enabled."
- )
- private String resourceUsageTransportPublishTopicName = "non-persistent://pulsar/system/resource-usage";
-
- @FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Default interval to publish usage reports if resourceUsagePublishToTopic is enabled."
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 49609c2..cb13048 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -491,6 +491,8 @@
<version>${lightproto-maven-plugin.version}</version>
<configuration>
<sources>${project.basedir}/src/main/proto/ResourceUsage.proto</sources>
+ <targetSourcesSubDir>generated-sources/lightproto/java</targetSourcesSubDir>
+ <targetTestSourcesSubDir>generated-sources/lightproto/java</targetTestSourcesSubDir>
</configuration>
<executions>
<execution>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 17bb041..8db0b1d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -340,6 +340,11 @@ public class PulsarService implements AutoCloseable {
state = State.Closing;
// close the service in reverse order v.s. in which they are started
+ if (this.resourceUsageTransportManager != null) {
+ this.resourceUsageTransportManager.close();
+ this.resourceUsageTransportManager = null;
+ }
+
if (this.webService != null) {
try {
this.webService.close();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManager.java
index f7064e4..227bce0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManager.java
@@ -66,7 +66,7 @@ public class ResourceUsageTransportManager implements AutoCloseable {
final int sendTimeoutSecs = 10;
return pulsarClient.newProducer()
- .topic(pulsarService.getConfig().getResourceUsageTransportPublishTopicName())
+ .topic(RESOURCE_USAGE_TOPIC_NAME)
.batchingMaxPublishDelay(publishDelayMilliSecs, TimeUnit.MILLISECONDS)
.sendTimeout(sendTimeoutSecs, TimeUnit.SECONDS)
.blockIfQueueFull(false)
@@ -113,11 +113,12 @@ public class ResourceUsageTransportManager implements AutoCloseable {
private class ResourceUsageReader implements ReaderListener<byte[]>, AutoCloseable {
private final ResourceUsageInfo recdUsageInfo = new ResourceUsageInfo();
+
private final Reader<byte[]> consumer;
public ResourceUsageReader() throws PulsarClientException {
consumer = pulsarClient.newReader()
- .topic(pulsarService.getConfig().getResourceUsageTransportPublishTopicName())
+ .topic(RESOURCE_USAGE_TOPIC_NAME)
.startMessageId(MessageId.latest)
.readerListener(this)
.create();
@@ -130,9 +131,19 @@ public class ResourceUsageTransportManager implements AutoCloseable {
@Override
public void received(Reader<byte[]> reader, Message<byte[]> msg) {
- try {
- recdUsageInfo.parseFrom(Unpooled.wrappedBuffer(msg.getData()), msg.getData().length);
+ long publishTime = msg.getPublishTime();
+ long currentTime = System.currentTimeMillis();
+ long timeDelta = currentTime - publishTime;
+ recdUsageInfo.parseFrom(Unpooled.wrappedBuffer(msg.getData()), msg.getData().length);
+ if (timeDelta > TimeUnit.SECONDS.toMillis(
+ 2 * pulsarService.getConfig().getResourceUsageTransportPublishIntervalInSecs())) {
+ LOG.error("Stale resource usage msg from broker {} publish time {} current time{}",
+ recdUsageInfo.getBroker(), publishTime, currentTime);
+ staleMessageCount++;
+ return;
+ }
+ try {
recdUsageInfo.getUsageMapsList().forEach(ru -> {
ResourceUsageConsumer owner = consumerMap.get(ru.getOwner());
if (owner != null) {
@@ -150,6 +161,7 @@ public class ResourceUsageTransportManager implements AutoCloseable {
}
private static final Logger LOG = LoggerFactory.getLogger(ResourceUsageTransportManager.class);
+ public static final String RESOURCE_USAGE_TOPIC_NAME = "non-persistent://pulsar/system/resource-usage";
private final PulsarService pulsarService;
private final PulsarClient pulsarClient;
private final ResourceUsageWriterTask pTask;
@@ -159,9 +171,11 @@ public class ResourceUsageTransportManager implements AutoCloseable {
private final Map<String, ResourceUsageConsumer>
consumerMap = new ConcurrentHashMap<String, ResourceUsageConsumer>();
+ private long staleMessageCount = 0;
+
private void createTenantAndNamespace() throws PulsarServerException, PulsarAdminException {
// Create a public tenant and default namespace
- TopicName topicName = TopicName.get(pulsarService.getConfig().getResourceUsageTransportPublishTopicName());
+ TopicName topicName = TopicName.get(RESOURCE_USAGE_TOPIC_NAME);
PulsarAdmin admin = pulsarService.getAdminClient();
ServiceConfiguration config = pulsarService.getConfig();
@@ -172,12 +186,26 @@ public class ResourceUsageTransportManager implements AutoCloseable {
List<String> tenantList = admin.tenants().getTenants();
if (!tenantList.contains(tenant)) {
- admin.tenants().createTenant(tenant,
- new TenantInfo(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(cluster)));
+ try {
+ admin.tenants().createTenant(tenant,
+ new TenantInfo(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(cluster)));
+ } catch (PulsarAdminException ex1) {
+ if (!(ex1 instanceof PulsarAdminException.ConflictException)) {
+ LOG.error("Unexpected exception {} when creating tenant {}", ex1, tenant);
+ throw ex1;
+ }
+ }
}
List<String> nsList = admin.namespaces().getNamespaces(tenant);
if (!nsList.contains(namespace)) {
- admin.namespaces().createNamespace(namespace);
+ try {
+ admin.namespaces().createNamespace(namespace);
+ } catch (PulsarAdminException ex1) {
+ if (!(ex1 instanceof PulsarAdminException.ConflictException)) {
+ LOG.error("Unexpected exception {} when creating namespace {}", ex1, namespace);
+ throw ex1;
+ }
+ }
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java
index 4c1f186..89246d8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java
@@ -34,7 +34,6 @@ import static org.testng.Assert.assertNotNull;
public class ResourceUsageTransportManagerTest extends MockedPulsarServiceBaseTest {
- private static final String INTERNAL_TOPIC = "non-persistent://pulsar-test/test/resource-usage";
private static final int PUBLISH_INTERVAL_SECS = 1;
@BeforeClass
@@ -53,11 +52,10 @@ public class ResourceUsageTransportManagerTest extends MockedPulsarServiceBaseTe
@Test
public void testNamespaceCreation() throws Exception {
ResourceUsageTransportManager tManager = new ResourceUsageTransportManager(pulsar);
- TopicName topicName = TopicName.get(INTERNAL_TOPIC);
+ TopicName topicName = TopicName.get(ResourceUsageTransportManager.RESOURCE_USAGE_TOPIC_NAME);
assertTrue(admin.tenants().getTenants().contains(topicName.getTenant()));
assertTrue(admin.namespaces().getNamespaces(topicName.getTenant()).contains(topicName.getNamespace()));
-
}
@Test
@@ -116,7 +114,6 @@ public class ResourceUsageTransportManagerTest extends MockedPulsarServiceBaseTe
private void prepareData() throws PulsarAdminException {
this.conf.setResourceUsageTransportClassName("org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager");
- this.conf.setResourceUsageTransportPublishTopicName(INTERNAL_TOPIC);
this.conf.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
admin.clusters().createCluster("test", new ClusterData(pulsar.getBrokerServiceUrl()));
}