You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/12/06 20:27:43 UTC

[pulsar] branch master updated: Cleanup Exception thrown during error (#3133)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d76dfe2  Cleanup Exception thrown during error (#3133)
d76dfe2 is described below

commit d76dfe2e42a196f8d306982ff33bf627caff8245
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Dec 6 12:27:38 2018 -0800

    Cleanup Exception thrown during error (#3133)
    
    * Cleanup Exception thrown during error
    
    * Added more validation
---
 .../pulsar/client/admin/internal/BaseResource.java  | 14 ++++++++++++++
 .../pulsar/client/admin/internal/FunctionsImpl.java | 17 ++++++++---------
 .../pulsar/client/admin/internal/SinkImpl.java      | 21 +++++----------------
 .../pulsar/client/admin/internal/SourceImpl.java    | 20 +++++---------------
 .../pulsar/client/admin/internal/TopicsImpl.java    |  8 +-------
 .../org/apache/pulsar/admin/cli/CmdSources.java     |  3 +++
 6 files changed, 36 insertions(+), 47 deletions(-)

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index 25d622b..8753ee1 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -31,6 +31,7 @@ import javax.ws.rs.client.Invocation.Builder;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
@@ -187,4 +188,17 @@ public abstract class BaseResource {
         }
     }
 
+    public WebApplicationException getApiException(Response response) {
+        if (response.getStatusInfo().equals(Response.Status.OK)) {
+            return null;
+        }
+        if (response.getStatus() >= 500) {
+            throw new ServerErrorException(response);
+        } else if (response.getStatus() >= 400) {
+            throw new ClientErrorException(response);
+        } else {
+            throw new WebApplicationException(response);
+        }
+    }
+
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index c651fcb..5f80593 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -38,7 +38,6 @@ import org.glassfish.jersey.media.multipart.FormDataBodyPart;
 import org.glassfish.jersey.media.multipart.FormDataMultiPart;
 import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
 
-import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.GenericType;
@@ -67,7 +66,7 @@ public class FunctionsImpl extends BaseResource implements Functions {
         try {
             Response response = request(functions.path(tenant).path(namespace)).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw new ClientErrorException(response);
+                throw getApiException(response);
             }
             return response.readEntity(new GenericType<List<String>>() {
             });
@@ -81,7 +80,7 @@ public class FunctionsImpl extends BaseResource implements Functions {
         try {
              Response response = request(functions.path(tenant).path(namespace).path(function)).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw new ClientErrorException(response);
+                throw getApiException(response);
             }
             return response.readEntity(FunctionConfig.class);
         } catch (Exception e) {
@@ -95,7 +94,7 @@ public class FunctionsImpl extends BaseResource implements Functions {
         try {
             Response response = request(functions.path(tenant).path(namespace).path(function).path("status")).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw new ClientErrorException(response);
+                throw getApiException(response);
             }
             return response.readEntity(FunctionStatus.class);
         } catch (Exception e) {
@@ -110,7 +109,7 @@ public class FunctionsImpl extends BaseResource implements Functions {
                     functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("status"))
                             .get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw new ClientErrorException(response);
+                throw getApiException(response);
             }
             return response.readEntity(FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData.class);
         } catch (Exception e) {
@@ -124,7 +123,7 @@ public class FunctionsImpl extends BaseResource implements Functions {
             Response response = request(
                     functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("stats")).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw new ClientErrorException(response);
+                throw getApiException(response);
             }
             return response.readEntity(FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class);
         } catch (Exception e) {
@@ -138,7 +137,7 @@ public class FunctionsImpl extends BaseResource implements Functions {
             Response response = request(
                     functions.path(tenant).path(namespace).path(function).path("stats")).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw new ClientErrorException(response);
+                throw getApiException(response);
             }
             return response.readEntity(FunctionStats.class);
         } catch (Exception e) {
@@ -330,7 +329,7 @@ public class FunctionsImpl extends BaseResource implements Functions {
         try {
             Response response = request(functions.path("connectors")).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw new ClientErrorException(response);
+                throw getApiException(response);
             }
             return response.readEntity(new GenericType<List<ConnectorDefinition>>() {
             });
@@ -366,7 +365,7 @@ public class FunctionsImpl extends BaseResource implements Functions {
             Response response = request(functions.path(tenant)
                 .path(namespace).path(function).path("state").path(key)).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw new ClientErrorException(response);
+                throw getApiException(response);
             }
             String value = response.readEntity(String.class);
             return new Gson().fromJson(value, new TypeToken<FunctionState>() {}.getType());
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
index 2300b62..1363d35 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
@@ -19,8 +19,6 @@
 package org.apache.pulsar.client.admin.internal;
 
 import com.google.gson.Gson;
-import com.google.protobuf.AbstractMessage.Builder;
-import com.google.protobuf.util.JsonFormat;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Sink;
@@ -28,20 +26,17 @@ import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.SinkStatus;
-import org.apache.pulsar.common.policies.data.SourceStatus;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.glassfish.jersey.media.multipart.FormDataBodyPart;
 import org.glassfish.jersey.media.multipart.FormDataMultiPart;
 import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
 
-import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.io.File;
-import java.io.IOException;
 import java.util.List;
 
 @Slf4j
@@ -59,7 +54,7 @@ public class SinkImpl extends BaseResource implements Sink {
         try {
             Response response = request(sink.path(tenant).path(namespace)).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw new ClientErrorException(response);
+                throw getApiException(response);
             }
             return response.readEntity(new GenericType<List<String>>() {
             });
@@ -73,7 +68,7 @@ public class SinkImpl extends BaseResource implements Sink {
         try {
              Response response = request(sink.path(tenant).path(namespace).path(sinkName)).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw new ClientErrorException(response);
+                throw getApiException(response);
             }
             return response.readEntity(SinkConfig.class);
         } catch (Exception e) {
@@ -87,7 +82,7 @@ public class SinkImpl extends BaseResource implements Sink {
         try {
             Response response = request(sink.path(tenant).path(namespace).path(sinkName).path("status")).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw new ClientErrorException(response);
+                throw getApiException(response);
             }
             return response.readEntity(SinkStatus.class);
         } catch (Exception e) {
@@ -103,7 +98,7 @@ public class SinkImpl extends BaseResource implements Sink {
                     sink.path(tenant).path(namespace).path(sinkName).path(Integer.toString(id)).path("status"))
                             .get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw new ClientErrorException(response);
+                throw getApiException(response);
             }
             return response.readEntity(SinkStatus.SinkInstanceStatus.SinkInstanceStatusData.class);
         } catch (Exception e) {
@@ -242,17 +237,11 @@ public class SinkImpl extends BaseResource implements Sink {
         try {
             Response response = request(sink.path("builtinsinks")).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw new ClientErrorException(response);
+                throw getApiException(response);
             }
             return response.readEntity(new GenericType<List<ConnectorDefinition>>() {});
         } catch (Exception e) {
             throw getApiException(e);
         }
     }
-
-
-    public static void mergeJson(String json, Builder builder) throws IOException {
-        JsonFormat.parser().merge(json, builder);
-    }
-
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
index ac330ab..d0d36a5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
@@ -19,8 +19,6 @@
 package org.apache.pulsar.client.admin.internal;
 
 import com.google.gson.Gson;
-import com.google.protobuf.AbstractMessage.Builder;
-import com.google.protobuf.util.JsonFormat;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Source;
@@ -33,14 +31,12 @@ import org.glassfish.jersey.media.multipart.FormDataBodyPart;
 import org.glassfish.jersey.media.multipart.FormDataMultiPart;
 import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
 
-import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.io.File;
-import java.io.IOException;
 import java.util.List;
 
 @Slf4j
@@ -58,7 +54,7 @@ public class SourceImpl extends BaseResource implements Source {
         try {
             Response response = request(source.path(tenant).path(namespace)).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw new ClientErrorException(response);
+                throw getApiException(response);
             }
             return response.readEntity(new GenericType<List<String>>() {
             });
@@ -72,7 +68,7 @@ public class SourceImpl extends BaseResource implements Source {
         try {
              Response response = request(source.path(tenant).path(namespace).path(sourceName)).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw new ClientErrorException(response);
+                throw getApiException(response);
             }
             return response.readEntity(SourceConfig.class);
         } catch (Exception e) {
@@ -86,7 +82,7 @@ public class SourceImpl extends BaseResource implements Source {
         try {
             Response response = request(source.path(tenant).path(namespace).path(sourceName).path("status")).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw new ClientErrorException(response);
+                throw getApiException(response);
             }
             return response.readEntity(SourceStatus.class);
         } catch (Exception e) {
@@ -102,7 +98,7 @@ public class SourceImpl extends BaseResource implements Source {
                     source.path(tenant).path(namespace).path(sourceName).path(Integer.toString(id)).path("status"))
                             .get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw new ClientErrorException(response);
+                throw getApiException(response);
             }
             return response.readEntity(SourceStatus.SourceInstanceStatus.SourceInstanceStatusData.class);
         } catch (Exception e) {
@@ -241,17 +237,11 @@ public class SourceImpl extends BaseResource implements Source {
         try {
             Response response = request(source.path("builtinsources")).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw new ClientErrorException(response);
+                throw getApiException(response);
             }
             return response.readEntity(new GenericType<List<ConnectorDefinition>>() {});
         } catch (Exception e) {
             throw getApiException(e);
         }
     }
-
-
-    public static void mergeJson(String json, Builder builder) throws IOException {
-        JsonFormat.parser().merge(json, builder);
-    }
-
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index eb76f60..8e386dd 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -889,13 +889,7 @@ public class TopicsImpl extends BaseResource implements Topics, PersistentTopics
     private List<Message<byte[]>> getMessageFromHttpResponse(String topic, Response response) throws Exception {
 
         if (response.getStatus() != Status.OK.getStatusCode()) {
-            if (response.getStatus() >= 500) {
-                throw new ServerErrorException(response);
-            } else if (response.getStatus() >= 400) {
-                throw new ClientErrorException(response);
-            } else {
-                throw new WebApplicationException(response);
-            }
+            throw getApiException(response);
         }
 
         String msgId = response.getHeaderString("X-Pulsar-Message-ID");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 4100c6b..9ec950a 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -391,6 +391,9 @@ public class CmdSources extends CmdBase {
                     throw new IllegalArgumentException(String.format("Source Archive %s does not exist", sourceConfig.getArchive()));
                 }
             }
+            if (isBlank(sourceConfig.getName())) {
+                throw new IllegalArgumentException("Source name not specified");
+            }
         }
 
         protected String validateSourceType(String sourceType) throws IOException {