You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/11/26 13:36:46 UTC

[GitHub] [pulsar] zymap commented on a change in pull request #8719: [Transaction] Transaction buffer in memory implementation.

zymap commented on a change in pull request #8719:
URL: https://github.com/apache/pulsar/pull/8719#discussion_r531007714



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -53,14 +150,36 @@ public TopicTransactionBuffer(PersistentTopic topic) {
 
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
+        if (!checkIfReady()) {
+            return FutureUtil.failedFuture(

Review comment:
       Maybe we can reuse the `ComplatableFuture` which is declared at line 158?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
##########
@@ -0,0 +1,71 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * The implement of transaction buffer state.
+ */
+public abstract class TopicTransactionBufferState {
+
+    /**
+     * The state of the transactionMetadataStore {@link TopicTransactionBufferState}.
+     */
+    public enum State {
+        None,
+        Initializing,
+        Ready,
+        Close
+    }
+
+    private static final AtomicReferenceFieldUpdater<TopicTransactionBufferState, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(TopicTransactionBufferState.class, State.class, "state");
+
+    @SuppressWarnings("unused")
+    private volatile State state = null;
+
+    public TopicTransactionBufferState(State state) {
+        STATE_UPDATER.set(this, state);
+
+    }
+
+    protected boolean changeToReadyState() {
+        return (STATE_UPDATER.compareAndSet(this, State.Initializing, State.Ready));
+    }
+
+    protected boolean changeToInitializingState() {
+        return STATE_UPDATER.compareAndSet(this, State.None, State.Initializing);
+    }
+
+    protected boolean changeToCloseState() {
+        return (STATE_UPDATER.compareAndSet(this, State.Ready, State.Close)

Review comment:
       Why not using `STATE_UPDATER.set(this, State.Close)`?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -69,47 +188,71 @@ public TopicTransactionBuffer(PersistentTopic topic) {
     public CompletableFuture<TransactionBufferReader> openTransactionBufferReader(TxnID txnID, long startSequenceId) {
         return null;
     }
+
     @Override
     public CompletableFuture<Void> commitTxn(TxnID txnID, List<MessageIdData> sendMessageIdList) {
+        if (!checkIfReady()) {
+            return FutureUtil.failedFuture(
+                    new ServiceUnitNotReadyException("Topic : " + topic.getName()
+                            +  " transaction buffer haven't finish replay!"));
+        }
         if (log.isDebugEnabled()) {
             log.debug("Transaction {} commit on topic {}.", txnID.toString(), topic.getName());
         }
         CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-
-        ByteBuf commitMarker = Markers.newTxnCommitMarker(-1L, txnID.getMostSigBits(),
-                txnID.getLeastSigBits(), getMessageIdDataList(sendMessageIdList));
-        topic.publishMessage(commitMarker, (e, ledgerId, entryId) -> {
-            if (e != null) {
-                log.error("Failed to commit for txn {}", txnID, e);
-                completableFuture.completeExceptionally(e);
-                return;
-            }
+        ConcurrentOpenHashSet<PositionImpl> positions = txnBufferCache.get(txnID);
+        if (positions != null) {
+            ByteBuf commitMarker = Markers.newTxnCommitMarker(-1L, txnID.getMostSigBits(),
+                    txnID.getLeastSigBits(), convertPositionToMessageIdData(positions.values()));
+            topic.publishMessage(commitMarker, (e, ledgerId, entryId) -> {
+                if (e != null) {
+                    log.error("Failed to commit for txn {}", txnID, e);
+                    completableFuture.completeExceptionally(e);
+                    return;
+                }
+                txnBufferCache.remove(txnID);
+                completableFuture.complete(null);
+                positionsSort.removeAll(positions.values());
+            });
+        } else {

Review comment:
       We can commit a transaction even if there are no messages in it?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferInMemoryTest.java
##########
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+public class TransactionBufferInMemoryTest extends BrokerTestBase {
+
+    private final static String TOPIC = "persistent://prop/ns-abc/test";
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        ServiceConfiguration configuration = getDefaultConf();
+        configuration.setTransactionCoordinatorEnabled(true);
+        super.baseSetup(configuration);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testTransactionBufferCommitAndAbort() throws Exception {
+        admin.topics().createNonPartitionedTopicAsync(TOPIC);
+        admin.topics().createSubscription(TOPIC, "test", MessageId.earliest);
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService()
+                .getTopic(TopicName.get(TOPIC).toString(), true).get().get();
+        TopicTransactionBuffer topicTransactionBuffer =
+                (TopicTransactionBuffer) topic.getTransactionBuffer(true).get();
+        for (int i = 0; i < 3; i++) {

Review comment:
       We have some tests using `Awaitility` to wait for some conditions. We can use that to do this.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -53,14 +150,36 @@ public TopicTransactionBuffer(PersistentTopic topic) {
 
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
+        if (!checkIfReady()) {
+            return FutureUtil.failedFuture(
+                    new ServiceUnitNotReadyException("Topic : " + topic.getName()
+                            +  " transaction buffer haven't finish replay!"));
+        }
         CompletableFuture<Position> completableFuture = new CompletableFuture<>();
         topic.publishMessage(buffer, (e, ledgerId, entryId) -> {
             if (e != null) {
                 log.error("Failed to append buffer to txn {}", txnId, e);
                 completableFuture.completeExceptionally(e);
                 return;
             }
-            completableFuture.complete(PositionImpl.get(ledgerId, entryId));
+            ConcurrentOpenHashSet<PositionImpl> positions =
+                    txnBufferCache.computeIfAbsent(txnId, (v) -> new ConcurrentOpenHashSet<>());
+            PositionImpl position = PositionImpl.get(ledgerId, entryId);
+            positions.add(position);
+            positionsSort.add(position);
+            completableFuture.complete(position);
+            if (countToSyncPosition.incrementAndGet() == defaultCountToSyncPosition) {
+                try {
+                    PositionImpl syncPosition = positionsSort.pollFirst();
+                    if (syncPosition != null) {
+                        topic.getManagedLedger().setProperty(txnOnGoingPositionName, syncPosition.toString());
+                    }
+                } catch (ManagedLedgerException | InterruptedException exception) {
+                    log.error("Topic : [{}] Position : [{}], transaction buffer " +
+                            "sync replay position fail!", topic.getName(), position);
+                }
+                countToSyncPosition.addAndGet(defaultCountToSyncPosition);

Review comment:
       I only see the `countToSyncPosition` is increased but doesn't see it decrease. That seems weird.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org