You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/28 17:28:08 UTC

[GitHub] [pulsar] tisonkun commented on a diff in pull request #5680: [transaction-coordinator] Implementation of transaction coordinator client.

tisonkun commented on code in PR #5680:
URL: https://github.com/apache/pulsar/pull/5680#discussion_r982683198


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java:
##########
@@ -18,42 +18,209 @@
  */
 package org.apache.pulsar.client.impl.transaction;
 
-import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.CoordinatorClientStateException;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
+import org.apache.pulsar.client.util.MathUtils;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 /**
- * The implementation of {@link TransactionCoordinatorClient}.
+ * Transaction coordinator client based topic assigned.
  */
 public class TransactionCoordinatorClientImpl implements TransactionCoordinatorClient {
 
-    private final PulsarClientImpl client;
+    private static final Logger LOG = LoggerFactory.getLogger(TransactionCoordinatorClientImpl.class);
+
+    private final PulsarClientImpl pulsarClient;
+    private TransactionMetaStoreHandler[] handlers;
+    private ConcurrentLongHashMap<TransactionMetaStoreHandler> handlerMap = new ConcurrentLongHashMap<>(16, 1);
+    private final AtomicLong epoch = new AtomicLong(0);
+
+    private static final AtomicReferenceFieldUpdater<TransactionCoordinatorClientImpl, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(TransactionCoordinatorClientImpl.class, State.class, "state");
+    private volatile State state = State.NONE;
+
+    public TransactionCoordinatorClientImpl(PulsarClient pulsarClient) {
+        this.pulsarClient = (PulsarClientImpl) pulsarClient;
+    }
+
+    @Override
+    public void start() throws TransactionCoordinatorClientException {
+        try {
+            startAsync().get();
+        } catch (Exception e) {
+            throw TransactionCoordinatorClientException.unwrap(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> startAsync() {
+        if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) {
+            return pulsarClient.getLookup().getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN)
+                .thenAccept(partitionMeta -> {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Transaction meta store assign partition is {}.", partitionMeta.partitions);
+                    }
+                    if (partitionMeta.partitions > 0) {
+                        handlers = new TransactionMetaStoreHandler[partitionMeta.partitions];
+                        for (int i = 0; i < partitionMeta.partitions; i++) {
+                            TransactionMetaStoreHandler handler = new TransactionMetaStoreHandler(i, pulsarClient,
+                                    TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + i);
+                            handlers[i] = handler;
+                            handlerMap.put(i, handler);
+                        }
+                    } else {
+                        handlers = new TransactionMetaStoreHandler[1];
+                        TransactionMetaStoreHandler handler = new TransactionMetaStoreHandler(0, pulsarClient,
+                                TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+                        handlers[0] = handler;
+                        handlerMap.put(0, handler);
+                    }
+
+                    STATE_UPDATER.set(TransactionCoordinatorClientImpl.this, State.READY);
+
+                });
+        } else {
+            return FutureUtil.failedFuture(new CoordinatorClientStateException("Can not start while current state is " + state));
+        }
+    }
+
+    @Override
+    public void close() throws TransactionCoordinatorClientException {
+        try {
+            closeAsync().get();
+        } catch (Exception e) {
+            throw TransactionCoordinatorClientException.unwrap(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        if (getState() == State.CLOSING || getState() == State.CLOSED) {

Review Comment:
   @codelipenghui It seems the state can be transformed only:
   
   * FROM `NONE` to `STARTING` in L73.
   * FROM `STARTING` to `READY` in L95.
   
   ... never using `CLOSING` or `CLOSED`. It keeps even in the latest master. Any thoughts here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org