You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/03/07 20:36:40 UTC

[GitHub] mgodave closed pull request #1355: Remove as many Thread.sleep calls from REST endpoint

mgodave closed pull request #1355: Remove as many Thread.sleep calls from REST endpoint
URL: https://github.com/apache/incubator-pulsar/pull/1355
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1e7764083..9c65fc883 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -20,30 +20,29 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-
 import javax.servlet.ServletContext;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
-
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -66,9 +65,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-
 public abstract class AdminResource extends PulsarWebResource {
     private static final Logger log = LoggerFactory.getLogger(AdminResource.class);
     private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";
@@ -102,6 +98,21 @@ protected void zkCreateOptimistic(String path, byte[] content) throws Exception
         ZkUtils.createFullPathOptimistic(globalZk(), path, content, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     }
 
+    protected CompletableFuture<Void> zkAsyncCreateOptimistic(String path, byte[] content) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ZkUtils.asyncCreateFullPathOptimistic(globalZk(), path, content, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
+            (rc, path1, ctx, name) -> {
+                KeeperException.Code code = KeeperException.Code.get(rc);
+                if (code != KeeperException.Code.OK) {
+                    KeeperException e = KeeperException.create(code);
+                    future.completeExceptionally(e);
+                } else {
+                    future.complete(null);
+                }
+            }, null);
+        return future;
+    }
+
     /**
      * Get the domain of the topic (whether it's persistent or non-persistent)
      */
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9386df5ae..5dc142595 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -329,7 +329,7 @@ protected void internalRevokePermissionsOnTopic(String role) {
         }
     }
 
-    protected void internalCreatePartitionedTopic(int numPartitions, boolean authoritative) {
+    protected CompletableFuture<Void> internalCreatePartitionedTopic(int numPartitions, boolean authoritative) {
         validateAdminAccessOnProperty(topicName.getProperty());
         if (numPartitions <= 1) {
             throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1");
@@ -338,16 +338,18 @@ protected void internalCreatePartitionedTopic(int numPartitions, boolean authori
             String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
                     topicName.getEncodedLocalName());
             byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
-            zkCreateOptimistic(path, data);
-            // we wait for the data to be synced in all quorums and the observers
-            Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
-            log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
-        } catch (KeeperException.NodeExistsException e) {
-            log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
-            throw new RestException(Status.CONFLICT, "Partitioned topic already exist");
+            return zkAsyncCreateOptimistic(path, data).handle((ignore, e) -> {
+                if (null == e) {
+                    log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
+                    return null;
+                } else {
+                    log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
+                    throw new RestException(Status.CONFLICT, "Partitioned topic already exist");
+                }
+            });
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-            throw new RestException(e);
+            return FutureUtil.failedFuture(new RestException(e));
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 0f9ffe144..60565fff7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -28,7 +28,9 @@
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 
 import com.google.common.collect.Lists;
@@ -116,26 +118,27 @@ public PersistentTopicInternalStats getInternalStats(@PathParam("property") Stri
             @ApiResponse(code = 409, message = "Partitioned topic already exist") })
     public void createPartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
-            int numPartitions, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+            int numPartitions, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, AsyncResponse response) {
         validateTopicName(property, cluster, namespace, encodedTopic);
         validateAdminAccessOnProperty(topicName.getProperty());
         if (numPartitions <= 1) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1");
+            response.resume(new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1"));
         }
         try {
             String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
                     topicName.getEncodedLocalName());
             byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
-            zkCreateOptimistic(path, data);
-            // we wait for the data to be synced in all quorums and the observers
-            Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
-            log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
-        } catch (KeeperException.NodeExistsException e) {
-            log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
-            throw new RestException(Status.CONFLICT, "Partitioned topic already exist");
+            zkAsyncCreateOptimistic(path, data).thenAccept(ignore -> {
+                log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
+                response.resume(Response.ok());
+            }).exceptionally(e -> {
+                log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
+                response.resume(new RestException(Status.CONFLICT, "Partitioned topic already exist"));
+                return null;
+            });
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-            throw new RestException(e);
+            response.resume(new RestException(e));
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 8e105424c..5f2249f4a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -130,9 +130,16 @@ public void revokePermissionsOnTopic(@PathParam("property") String property,
             @ApiResponse(code = 409, message = "Partitioned topic already exist") })
     public void createPartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
-            int numPartitions, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+            int numPartitions, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, AsyncResponse response) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalCreatePartitionedTopic(numPartitions, authoritative);
+        internalCreatePartitionedTopic(numPartitions, authoritative).handle((ignore, e) -> {
+            if (e != null) {
+                response.resume(e);
+            } else {
+                response.resume(Response.noContent());
+            }
+            return null;
+        });
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 493030b7f..084ed1263 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -29,7 +29,9 @@
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 
 import org.apache.pulsar.broker.service.Topic;
@@ -39,7 +41,6 @@
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,26 +104,27 @@ public PersistentTopicInternalStats getInternalStats(@PathParam("property") Stri
             @ApiResponse(code = 409, message = "Partitioned topic already exist") })
     public void createPartitionedTopic(@PathParam("property") String property, @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic, int numPartitions,
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, AsyncResponse response) {
         validateTopicName(property, namespace, encodedTopic);
         validateAdminAccessOnProperty(topicName.getProperty());
         if (numPartitions <= 1) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1");
+            response.resume(new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1"));
         }
         try {
             String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
                     topicName.getEncodedLocalName());
             byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
-            zkCreateOptimistic(path, data);
-            // we wait for the data to be synced in all quorums and the observers
-            Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
-            log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
-        } catch (KeeperException.NodeExistsException e) {
-            log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
-            throw new RestException(Status.CONFLICT, "Partitioned topic already exist");
+            zkAsyncCreateOptimistic(path, data).thenAccept(ignore -> {
+                response.resume(Response.ok());
+                log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
+            }).exceptionally(e -> {
+                log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
+                response.resume(new RestException(Status.CONFLICT, "Partitioned topic already exist"));
+                return null;
+            });
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-            throw new RestException(e);
+            response.resume(new RestException(e));
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 0f89bfd82..055347e1f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -126,9 +126,16 @@ public void revokePermissionsOnTopic(@PathParam("property") String property,
             @ApiResponse(code = 409, message = "Partitioned topic already exist") })
     public void createPartitionedTopic(@PathParam("property") String property, @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic, int numPartitions,
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, AsyncResponse response) {
         validateTopicName(property, namespace, encodedTopic);
-        internalCreatePartitionedTopic(numPartitions, authoritative);
+        internalCreatePartitionedTopic(numPartitions, authoritative).handle((ignore, e) -> {
+            if (e != null) {
+                response.resume(e);
+            } else {
+                response.resume(Response.ok());
+            }
+            return null;
+        });
     }
 
     /**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services