You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/12/07 18:39:43 UTC

[GitHub] srkukarni closed pull request #3137: Make Source/Sink status Source/Sink specific

srkukarni closed pull request #3137: Make Source/Sink status Source/Sink specific
URL: https://github.com/apache/pulsar/pull/3137
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java
index 72cf7362a8..609f1071e4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java
@@ -27,7 +27,9 @@
 
 @Data
 public class SinkStatus {
+    // The total number of sink instances that ought to be running
     public int numInstances;
+    // The number of source instances that are actually running
     public int numRunning;
     public List<SinkInstanceStatus> instances = new LinkedList<>();
 
@@ -38,20 +40,29 @@
 
         @Data
         public static class SinkInstanceStatusData {
-
+            // Is this instance running?
             public boolean running;
 
+            // Do we have any error while running this instance
             public String error;
 
+            // Number of times this instance has restarted
             public long numRestarts;
 
-            public long numReceived;
+            // Number of messages read from Pulsar
+            public long numReadFromPulsar;
 
+            // Number of times there was a system exception handling messages
             public long numSystemExceptions;
 
+            // A list of the most recent system exceptions
             public List<ExceptionInformation> latestSystemExceptions;
 
-            public long lastInvocationTime;
+            // Number of messages written to sink
+            public long numWrittenToSink;
+
+            // When was the last time we received a message from Pulsar
+            public long lastReceivedTime;
 
             public String workerId;
         }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java
index 1ea8a801c0..4043900421 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java
@@ -27,7 +27,9 @@
 
 @Data
 public class SourceStatus {
+    // The total number of source instances that ought to be running
     public int numInstances;
+    // The number of source instances that are actually running
     public int numRunning;
     public List<SourceInstanceStatus> instances = new LinkedList<>();
 
@@ -38,21 +40,31 @@
 
         @Data
         public static class SourceInstanceStatusData {
-
+            // Is this instance running?
             public boolean running;
 
+            // Do we have any error while running this instance
             public String error;
 
+            // Number of times this instance has restarted
             public long numRestarts;
 
-            public long numReceived;
+            // Number of messages received from source
+            public long numReceivedFromSource;
 
+            // Number of times there was a system exception handling messages
             public long numSystemExceptions;
 
+            // A list of the most recent system exceptions
             public List<ExceptionInformation> latestSystemExceptions;
 
-            public long lastInvocationTime;
+            // Number of messages written into pulsar
+            public long numWritten;
+
+            // When was the last time we received a message from the source
+            public long lastReceivedTime;
 
+            // The worker id on which the source is running
             public String workerId;
         }
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index 4ecba12964..45bbba4436 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -60,7 +60,7 @@
             sinkInstanceStatusData.setRunning(status.getRunning());
             sinkInstanceStatusData.setError(status.getFailureException());
             sinkInstanceStatusData.setNumRestarts(status.getNumRestarts());
-            sinkInstanceStatusData.setNumReceived(status.getNumReceived());
+            sinkInstanceStatusData.setNumReadFromPulsar(status.getNumReceived());
 
             List<ExceptionInformation> userExceptionInformationList = new LinkedList<>();
             for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) {
@@ -82,7 +82,8 @@
             }
             sinkInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
 
-            sinkInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
+            sinkInstanceStatusData.setNumWrittenToSink(status.getNumSuccessfullyProcessed());
+            sinkInstanceStatusData.setLastReceivedTime(status.getLastInvocationTime());
             sinkInstanceStatusData.setWorkerId(assignedWorkerId);
 
             return sinkInstanceStatusData;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index 412019a184..f05eea6fd4 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -59,7 +59,7 @@
             sourceInstanceStatusData.setRunning(status.getRunning());
             sourceInstanceStatusData.setError(status.getFailureException());
             sourceInstanceStatusData.setNumRestarts(status.getNumRestarts());
-            sourceInstanceStatusData.setNumReceived(status.getNumReceived());
+            sourceInstanceStatusData.setNumReceivedFromSource(status.getNumReceived());
 
             List<ExceptionInformation> userExceptionInformationList = new LinkedList<>();
             for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) {
@@ -81,7 +81,8 @@
             }
             sourceInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
 
-            sourceInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
+            sourceInstanceStatusData.setNumWritten(status.getNumSuccessfullyProcessed());
+            sourceInstanceStatusData.setLastReceivedTime(status.getLastInvocationTime());
             sourceInstanceStatusData.setWorkerId(assignedWorkerId);
 
             return sourceInstanceStatusData;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 5add9a1455..f100c21a13 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -245,7 +245,7 @@ protected void getSinkStatus(String tenant, String namespace, String sinkName) t
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
             "sink",
-            "getstatus",
+            "status",
             "--tenant", tenant,
             "--namespace", namespace,
             "--name", sinkName
@@ -472,7 +472,7 @@ protected void getSourceStatus(String tenant, String namespace, String sourceNam
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
             "source",
-            "getstatus",
+            "status",
             "--tenant", tenant,
             "--namespace", namespace,
             "--name", sourceName
@@ -524,7 +524,7 @@ protected void waitForProcessingSourceMessages(String tenant,
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
             "source",
-            "getstatus",
+            "status",
             "--tenant", tenant,
             "--namespace", namespace,
             "--name", sourceName
@@ -544,8 +544,9 @@ protected void waitForProcessingSourceMessages(String tenant,
                     assertEquals(sourceStatus.getInstances().size(), 1);
                     assertEquals(sourceStatus.getInstances().get(0).getInstanceId(), 0);
                     assertEquals(sourceStatus.getInstances().get(0).getStatus().isRunning(), true);
-                    assertTrue(sourceStatus.getInstances().get(0).getStatus().getLastInvocationTime() > 0);
-                    assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumReceived(), numMessages);
+                    assertTrue(sourceStatus.getInstances().get(0).getStatus().getLastReceivedTime() > 0);
+                    assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumReceivedFromSource(), numMessages);
+                    assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumWritten(), numMessages);
                     assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
                     assertEquals(sourceStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
                     return;
@@ -568,7 +569,7 @@ protected void waitForProcessingSinkMessages(String tenant,
         String[] commands = {
                 PulsarCluster.ADMIN_SCRIPT,
                 "sink",
-                "getstatus",
+                "status",
                 "--tenant", tenant,
                 "--namespace", namespace,
                 "--name", sinkName
@@ -588,8 +589,9 @@ protected void waitForProcessingSinkMessages(String tenant,
                     assertEquals(sinkStatus.getInstances().size(), 1);
                     assertEquals(sinkStatus.getInstances().get(0).getInstanceId(), 0);
                     assertEquals(sinkStatus.getInstances().get(0).getStatus().isRunning(), true);
-                    assertTrue(sinkStatus.getInstances().get(0).getStatus().getLastInvocationTime() > 0);
-                    assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumReceived(), numMessages);
+                    assertTrue(sinkStatus.getInstances().get(0).getStatus().getLastReceivedTime() > 0);
+                    assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumReadFromPulsar(), numMessages);
+                    assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumWrittenToSink(), numMessages);
                     assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
                     assertEquals(sinkStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
                     return;
@@ -940,7 +942,7 @@ private static void getFunctionStatus(String functionName, int numMessages) thro
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
             PulsarCluster.ADMIN_SCRIPT,
             "functions",
-            "getstatus",
+            "status",
             "--tenant", "public",
             "--namespace", "default",
             "--name", functionName


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services