You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/05/22 10:18:17 UTC

[pulsar] branch master updated: Fix break changes in namespace offload policy and RabbitMQ sink. (#7011)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new add2eae  Fix break changes in namespace offload policy and RabbitMQ sink. (#7011)
add2eae is described below

commit add2eae1295e878a229efea60439eb32f27f40b2
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri May 22 18:17:59 2020 +0800

    Fix break changes in namespace offload policy and RabbitMQ sink. (#7011)
    
    ### Motivation
    
    Fix break changes in namespace offload policy and RabbitMQ sink.
---
 .../workflows/ci-integration-tiered-filesystem.yaml |  8 ++++++++
 .github/workflows/ci-integration-tiered-jcloud.yaml |  8 ++++++++
 .../pulsar/broker/admin/impl/NamespacesBase.java    | 21 +++++++++++++++++----
 .../apache/pulsar/broker/service/BrokerService.java | 11 -----------
 .../pulsar/io/rabbitmq/RabbitMQAbstractConfig.java  |  6 ++++++
 .../org/apache/pulsar/io/rabbitmq/RabbitMQSink.java | 15 +++++++++++++--
 .../pulsar/io/rabbitmq/RabbitMQSinkConfig.java      |  6 ++++++
 7 files changed, 58 insertions(+), 17 deletions(-)

diff --git a/.github/workflows/ci-integration-tiered-filesystem.yaml b/.github/workflows/ci-integration-tiered-filesystem.yaml
index cd48bc4..472cbce 100644
--- a/.github/workflows/ci-integration-tiered-filesystem.yaml
+++ b/.github/workflows/ci-integration-tiered-filesystem.yaml
@@ -62,6 +62,14 @@ jobs:
         if: steps.docs.outputs.changed_only == 'no'
         run: mvn clean install -DskipTests
 
+      - name: build pulsar image
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
+
+      - name: build pulsar-all image
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
+
       - name: build artifacts and docker image
         if: steps.docs.outputs.changed_only == 'no'
         run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests
diff --git a/.github/workflows/ci-integration-tiered-jcloud.yaml b/.github/workflows/ci-integration-tiered-jcloud.yaml
index af14f43..1594452 100644
--- a/.github/workflows/ci-integration-tiered-jcloud.yaml
+++ b/.github/workflows/ci-integration-tiered-jcloud.yaml
@@ -62,6 +62,14 @@ jobs:
         if: steps.docs.outputs.changed_only == 'no'
         run: mvn clean install -DskipTests
 
+      - name: build pulsar image
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
+
+      - name: build pulsar-all image
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
+
       - name: build artifacts and docker image
         if: steps.docs.outputs.changed_only == 'no'
         run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 4164db2..4c3ce04 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -50,6 +50,8 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -2408,9 +2410,14 @@ public abstract class NamespacesBase extends AdminResource {
             byte[] content = globalZk().getData(path, null, nodeStat);
 
             Policies policies = jsonMapper().readValue(content, Policies.class);
-            if (policies.offload_policies != null) {
-                policies.offload_policies.setManagedLedgerOffloadThresholdInBytes(newThreshold);
+            if (policies.offload_policies == null) {
+                OffloadPolicies defaultPolicy = pulsar().getDefaultOffloader().getOffloadPolicies();
+                policies.offload_policies = defaultPolicy == null ? new OffloadPolicies() : defaultPolicy;
+                if (policies.offload_deletion_lag_ms != null) {
+                    policies.offload_policies.setManagedLedgerOffloadDeletionLagInMillis(policies.offload_deletion_lag_ms);
+                }
             }
+            policies.offload_policies.setManagedLedgerOffloadThresholdInBytes(newThreshold);
             policies.offload_threshold = newThreshold;
 
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
@@ -2455,9 +2462,15 @@ public abstract class NamespacesBase extends AdminResource {
             byte[] content = globalZk().getData(path, null, nodeStat);
 
             Policies policies = jsonMapper().readValue(content, Policies.class);
-            if (policies.offload_policies != null) {
-                policies.offload_policies.setManagedLedgerOffloadDeletionLagInMillis(newDeletionLagMs);
+
+            if (policies.offload_policies == null) {
+                OffloadPolicies defaultPolicy = pulsar().getDefaultOffloader().getOffloadPolicies();
+                policies.offload_policies = defaultPolicy == null ? new OffloadPolicies() : defaultPolicy;
+                if (policies.offload_threshold != -1) {
+                    policies.offload_policies.setManagedLedgerOffloadThresholdInBytes(policies.offload_threshold);
+                }
             }
+            policies.offload_policies.setManagedLedgerOffloadDeletionLagInMillis(newDeletionLagMs);
             policies.offload_deletion_lag_ms = newDeletionLagMs;
 
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 490b6f8..7001523 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1062,17 +1062,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
             managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
             OffloadPolicies offloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
-
-            if (offloadPolicies == null) {
-                offloadPolicies = new OffloadPolicies();
-                offloadPolicies.setManagedLedgerOffloadDriver(pulsar.getConfiguration().getManagedLedgerOffloadDriver());
-                offloadPolicies.setManagedLedgerOffloadThresholdInBytes(
-                        pulsar.getConfiguration().getManagedLedgerOffloadAutoTriggerSizeThresholdBytes()
-                );
-                offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(
-                        pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()
-                );
-            }
             managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
 
             future.complete(managedLedgerConfig);
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQAbstractConfig.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQAbstractConfig.java
index a5a210e..065392d 100644
--- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQAbstractConfig.java
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQAbstractConfig.java
@@ -75,6 +75,12 @@ public class RabbitMQAbstractConfig implements Serializable {
     private String password = "guest";
 
     @FieldDoc(
+            required = false,
+            defaultValue = "",
+            help = "The RabbitMQ queue name from which messages should be read from or written to")
+    private String queueName;
+
+    @FieldDoc(
         required = false,
         defaultValue = "0",
         help = "Initially requested maximum channel number. 0 for unlimited")
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
index 3166ba2..38437c7 100644
--- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
@@ -23,6 +23,7 @@ import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
@@ -49,6 +50,7 @@ public class RabbitMQSink implements Sink<byte[]> {
     private Channel rabbitMQChannel;
     private RabbitMQSinkConfig rabbitMQSinkConfig;
     private String exchangeName;
+    private String defaultRoutingKey;
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
@@ -63,17 +65,26 @@ public class RabbitMQSink implements Sink<byte[]> {
         );
 
         exchangeName = rabbitMQSinkConfig.getExchangeName();
+        defaultRoutingKey = rabbitMQSinkConfig.getRoutingKey();
         String exchangeType = rabbitMQSinkConfig.getExchangeType();
 
         rabbitMQChannel = rabbitMQConnection.createChannel();
-        rabbitMQChannel.exchangeDeclare(exchangeName, exchangeType, true);
+        String queueName = rabbitMQSinkConfig.getQueueName();
+        if (StringUtils.isNotEmpty(queueName)) {
+            rabbitMQChannel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
+            rabbitMQChannel.queueDeclare(rabbitMQSinkConfig.getQueueName(), true, false, false, null);
+            rabbitMQChannel.queueBind(rabbitMQSinkConfig.getQueueName(), exchangeName, defaultRoutingKey);
+        } else {
+            rabbitMQChannel.exchangeDeclare(exchangeName, exchangeType, true);
+        }
     }
 
     @Override
     public void write(Record<byte[]> record) {
         byte[] value = record.getValue();
         try {
-            rabbitMQChannel.basicPublish(exchangeName, record.getProperties().get("routingKey"), null, value);
+            String routingKey = record.getProperties().get("routingKey");
+            rabbitMQChannel.basicPublish(exchangeName, StringUtils.isEmpty(routingKey) ? defaultRoutingKey : routingKey, null, value);
             record.ack();
         } catch (IOException e) {
             record.fail();
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
index a276d4f..7477b98 100644
--- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
@@ -45,6 +45,12 @@ public class RabbitMQSinkConfig extends RabbitMQAbstractConfig implements Serial
     private String exchangeName;
 
     @FieldDoc(
+            required = false,
+            defaultValue = "",
+            help = "The routing key used for publishing the messages")
+    private String routingKey;
+
+    @FieldDoc(
         required = false,
         defaultValue = "topic",
         help = "The exchange type to publish the messages on")