You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/26 20:42:58 UTC

[incubator-pulsar] branch master updated: Compaction uses GetLastMessageId (#1286)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new da64d0c  Compaction uses GetLastMessageId (#1286)
da64d0c is described below

commit da64d0cdfbd320c2364d37b4d37d918889cda8e7
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Mon Feb 26 21:42:55 2018 +0100

    Compaction uses GetLastMessageId (#1286)
    
    In two phase compaction, we read from the start of the topic to the
    end. Previously, we had no reliable way to knowing if we were at the
    end of the topic, so we used timeouts.
    
    However, with the new GetLastMessageId api, we can find how far we
    should read in a topic, and just stop when we get there.
---
 .../org/apache/pulsar/client/api/RawReader.java    |   6 ++
 .../apache/pulsar/client/impl/RawReaderImpl.java   |   5 +
 .../pulsar/compaction/TwoPhaseCompactor.java       |  45 ++++----
 .../apache/pulsar/client/impl/RawReaderTest.java   | 114 ++++++++++-----------
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   2 +-
 5 files changed, 92 insertions(+), 80 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
index b5a4a65..15b03be 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.api;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.RawReaderImpl;
@@ -61,6 +62,11 @@ public interface RawReader {
     CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String,Long> properties);
 
     /**
+     * Get the last message id available immediately available for reading
+     */
+    CompletableFuture<MessageId> getLastMessageIdAsync();
+
+    /**
      * Close the raw reader.
      */
     CompletableFuture<Void> closeAsync();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 04aa610..a048a5b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -78,6 +78,11 @@ public class RawReaderImpl implements RawReader {
         return consumer.closeAsync();
     }
 
+    @Override
+    public CompletableFuture<MessageId> getLastMessageIdAsync() {
+        return consumer.getLastMessageIdAsync();
+    }
+
     static class RawConsumerImpl extends ConsumerImpl {
         final BlockingQueue<RawMessageAndCnx> incomingRawMessages;
         final Queue<CompletableFuture<RawMessage>> pendingRawReceives;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 957e197..10dc8b1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -75,15 +75,22 @@ public class TwoPhaseCompactor extends Compactor {
 
     private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) {
         Map<String,MessageId> latestForKey = new HashMap<>();
-
         CompletableFuture<PhaseOneResult> loopPromise = new CompletableFuture<>();
-        phaseOneLoop(reader, Optional.empty(), Optional.empty(), latestForKey, loopPromise);
+
+        reader.getLastMessageIdAsync().whenComplete(
+                (lastMessageId, exception) -> {
+                    if (exception != null) {
+                        loopPromise.completeExceptionally(exception);
+                    } else {
+                        phaseOneLoop(reader, Optional.empty(), lastMessageId, latestForKey, loopPromise);
+                    }
+                });
         return loopPromise;
     }
 
     private void phaseOneLoop(RawReader reader,
                               Optional<MessageId> firstMessageId,
-                              Optional<MessageId> lastMessageId,
+                              MessageId lastMessageId,
                               Map<String,MessageId> latestForKey,
                               CompletableFuture<PhaseOneResult> loopPromise) {
         if (loopPromise.isDone()) {
@@ -91,34 +98,30 @@ public class TwoPhaseCompactor extends Compactor {
         }
         CompletableFuture<RawMessage> future = reader.readNextAsync();
         scheduleTimeout(future);
-        future.whenComplete(
+        future.whenCompleteAsync(
                 (m, exception) -> {
                     try {
                         if (exception != null) {
-                            if (exception instanceof TimeoutException
-                                && firstMessageId.isPresent()) {
-                                loopPromise.complete(new PhaseOneResult(firstMessageId.get(),
-                                                                        lastMessageId.get(),
-                                                                        latestForKey));
-                            } else {
-                                loopPromise.completeExceptionally(exception);
-                            }
+                            loopPromise.completeExceptionally(exception);
                             return;
                         }
-
                         MessageId id = m.getMessageId();
                         String key = extractKey(m);
                         latestForKey.put(key, id);
 
-                        phaseOneLoop(reader,
-                                     Optional.of(firstMessageId.orElse(id)),
-                                     Optional.of(id),
-                                     latestForKey, loopPromise);
+                        if (id.compareTo(lastMessageId) == 0) {
+                            loopPromise.complete(new PhaseOneResult(firstMessageId.orElse(id),
+                                                                    id, latestForKey));
+                        } else {
+                            phaseOneLoop(reader,
+                                         Optional.of(firstMessageId.orElse(id)),
+                                         lastMessageId,
+                                         latestForKey, loopPromise);
+                        }
                     } finally {
                         m.close();
                     }
-                });
-
+                }, scheduler);
     }
 
     private void scheduleTimeout(CompletableFuture<RawMessage> future) {
@@ -168,7 +171,7 @@ public class TwoPhaseCompactor extends Compactor {
 
     private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId> latestForKey,
                               LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise) {
-        reader.readNextAsync().whenComplete(
+        reader.readNextAsync().whenCompleteAsync(
                 (m, exception) -> {
                     try {
                         if (exception != null) {
@@ -205,7 +208,7 @@ public class TwoPhaseCompactor extends Compactor {
                     } finally {
                         m.close();
                     }
-                });
+                }, scheduler);
     }
 
     private CompletableFuture<LedgerHandle> createLedger(BookKeeper bk) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index 19cdc31..53da2ee 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -131,16 +131,16 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
         Set<String> keys = publishMessages(topic, numKeys);
 
         RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
-        try {
-            while (true) { // should break out with TimeoutException
-                try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
-                    Assert.assertTrue(keys.remove(extractKey(m)));
+
+        MessageId lastMessageId = reader.getLastMessageIdAsync().get();
+        while (true) {
+            try (RawMessage m = reader.readNextAsync().get()) {
+                Assert.assertTrue(keys.remove(extractKey(m)));
+                if (lastMessageId.compareTo(m.getMessageId()) == 0) {
+                    break;
                 }
             }
-        } catch (TimeoutException te) {
-            // ok
         }
-
         Assert.assertTrue(keys.isEmpty());
     }
 
@@ -153,28 +153,27 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
 
         Set<String> readKeys = new HashSet<>();
         RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
-        try {
-            while (true) { // should break out with TimeoutException
-                try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
-                    readKeys.add(extractKey(m));
+        MessageId lastMessageId = reader.getLastMessageIdAsync().get();
+        while (true) {
+            try (RawMessage m = reader.readNextAsync().get()) {
+                readKeys.add(extractKey(m));
+                if (lastMessageId.compareTo(m.getMessageId()) == 0) {
+                    break;
                 }
             }
-        } catch (TimeoutException te) {
-            // ok
         }
         Assert.assertEquals(readKeys.size(), numKeys);
 
         // seek to start, read all keys again,
         // assert that we read all keys we had read previously
         reader.seekAsync(MessageId.earliest).get();
-        try {
-            while (true) { // should break out with TimeoutException
-                try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
-                    Assert.assertTrue(readKeys.remove(extractKey(m)));
+        while (true) {
+            try (RawMessage m = reader.readNextAsync().get()) {
+                Assert.assertTrue(readKeys.remove(extractKey(m)));
+                if (lastMessageId.compareTo(m.getMessageId()) == 0) {
+                    break;
                 }
             }
-        } catch (TimeoutException te) {
-            // ok
         }
         Assert.assertTrue(readKeys.isEmpty());
     }
@@ -190,34 +189,34 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
         RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
         int i = 0;
         MessageId seekTo = null;
-        try {
-            while (true) { // should break out with TimeoutException
-                try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
-                    i++;
-                    if (i > numKeys/2) {
-                        if (seekTo == null) {
-                            seekTo = m.getMessageId();
-                        }
-                        readKeys.add(extractKey(m));
+        MessageId lastMessageId = reader.getLastMessageIdAsync().get();
+
+        while (true) {
+            try (RawMessage m = reader.readNextAsync().get()) {
+                i++;
+                if (i > numKeys/2) {
+                    if (seekTo == null) {
+                        seekTo = m.getMessageId();
                     }
+                    readKeys.add(extractKey(m));
+                }
+                if (lastMessageId.compareTo(m.getMessageId()) == 0) {
+                    break;
                 }
             }
-        } catch (TimeoutException te) {
-            // ok
         }
         Assert.assertEquals(readKeys.size(), numKeys/2);
 
         // seek to middle, read all keys again,
         // assert that we read all keys we had read previously
         reader.seekAsync(seekTo).get();
-        try {
-            while (true) { // should break out with TimeoutException
-                try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
-                    Assert.assertTrue(readKeys.remove(extractKey(m)));
+        while (true) { // should break out with TimeoutException
+            try (RawMessage m = reader.readNextAsync().get()) {
+                Assert.assertTrue(readKeys.remove(extractKey(m)));
+                if (lastMessageId.compareTo(m.getMessageId()) == 0) {
+                    break;
                 }
             }
-        } catch (TimeoutException te) {
-            // ok
         }
         Assert.assertTrue(readKeys.isEmpty());
     }
@@ -280,18 +279,18 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
                     }
                 }
             };
-        try {
-            while (true) { // should break out with TimeoutException
-                try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
-                    if (RawBatchConverter.isBatch(m)) {
-                        RawBatchConverter.explodeBatch(m).forEach(consumer);
-                    } else {
-                        consumer.accept(m);
-                    }
+        MessageId lastMessageId = reader.getLastMessageIdAsync().get();
+        while (true) {
+            try (RawMessage m = reader.readNextAsync().get()) {
+                if (RawBatchConverter.isBatch(m)) {
+                    RawBatchConverter.explodeBatch(m).forEach(consumer);
+                } else {
+                    consumer.accept(m);
+                }
+                if (lastMessageId.compareTo(m.getMessageId()) == 0) {
+                    break;
                 }
             }
-        } catch (TimeoutException te) {
-            // ok
         }
         Assert.assertTrue(keys.isEmpty());
     }
@@ -304,24 +303,23 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
 
         Set<String> keys = publishMessages(topic, numKeys);
 
-        MessageId lastMessageId = null;
         RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
-        try {
-            while (true) { // should break out with TimeoutException
-                try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
-                    lastMessageId = m.getMessageId();
-                    Assert.assertTrue(keys.remove(extractKey(m)));
+        MessageId lastMessageId = reader.getLastMessageIdAsync().get();
+
+        while (true) {
+            try (RawMessage m = reader.readNextAsync().get()) {
+                Assert.assertTrue(keys.remove(extractKey(m)));
+
+                if (lastMessageId.compareTo(m.getMessageId()) == 0) {
+                    break;
                 }
             }
-        } catch (TimeoutException te) {
-            // ok
         }
-
         Assert.assertTrue(keys.isEmpty());
 
         Map<String,Long> properties = new HashMap<>();
         properties.put("foobar", 0xdeadbeefdecaL);
-        reader.acknowledgeCumulativeAsync(lastMessageId, properties).get(5, TimeUnit.SECONDS);
+        reader.acknowledgeCumulativeAsync(lastMessageId, properties).get();
 
         PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic);
         ManagedLedger ledger = topicRef.getManagedLedger();
@@ -349,12 +347,12 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
         }
 
         for (int i = 0; i < numKeys/2; i++) {
-            futures.remove(0).get(5, TimeUnit.SECONDS); // complete successfully
+            futures.remove(0).get(); // complete successfully
         }
         reader.closeAsync().get();
         while (!futures.isEmpty()) {
             try {
-                futures.remove(0).get(5, TimeUnit.SECONDS);
+                futures.remove(0).get();
                 Assert.fail("Should have been cancelled");
             } catch (CancellationException ee) {
                 // correct behaviour
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e9c7817..4e526cc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1315,7 +1315,7 @@ public class ConsumerImpl extends ConsumerBase {
         return booleanFuture;
     }
 
-    private CompletableFuture<MessageId> getLastMessageIdAsync() {
+    CompletableFuture<MessageId> getLastMessageIdAsync() {
         if (getState() == State.Closing || getState() == State.Closed) {
             return FutureUtil
                 .failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed"));

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.