You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/08/24 17:37:31 UTC

[pulsar] branch master updated: Add ability for BatchPushSource to notify errors asynchronously (#7865)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0724482  Add ability for BatchPushSource to notify errors asynchronously (#7865)
0724482 is described below

commit 0724482fa9640babe425c761a0765eea7b99d7b5
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon Aug 24 10:37:04 2020 -0700

    Add ability for BatchPushSource to notify errors asynchronously (#7865)
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
---
 .../BatchDataGeneratorPushSource.java              | 81 ++++++++++++++++++++++
 .../org/apache/pulsar/io/core/BatchPushSource.java | 26 +++++++
 .../apache/pulsar/io/core/BatchPushSourceTest.java | 57 +++++++++++++++
 3 files changed, 164 insertions(+)

diff --git a/pulsar-io/batch-data-generator/src/main/java/org/apache/pulsar/io/batchdatagenerator/BatchDataGeneratorPushSource.java b/pulsar-io/batch-data-generator/src/main/java/org/apache/pulsar/io/batchdatagenerator/BatchDataGeneratorPushSource.java
new file mode 100644
index 0000000..c1c2920
--- /dev/null
+++ b/pulsar-io/batch-data-generator/src/main/java/org/apache/pulsar/io/batchdatagenerator/BatchDataGeneratorPushSource.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.batchdatagenerator;
+
+import io.codearte.jfairy.Fairy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.BatchPushSource;
+import org.apache.pulsar.io.core.SourceContext;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+@Slf4j
+public class BatchDataGeneratorPushSource extends BatchPushSource<Person> implements Runnable {
+
+  private Fairy fairy;
+  private SourceContext sourceContext;
+  private int maxRecordsPerCycle = 10;
+
+  private ExecutorService executor = Executors.newSingleThreadExecutor();
+
+  @Override
+  public void close() {
+    executor.shutdownNow();
+  }
+
+  @Override
+  public void open(Map config, SourceContext context) throws Exception {
+    this.fairy = Fairy.create();
+    this.sourceContext = context;
+  }
+
+  @Override
+  public void discover(Consumer taskEater) throws Exception {
+    log.info("Generating one task for each instance");
+    for (int i = 0; i < sourceContext.getNumInstances(); ++i) {
+      taskEater.accept(String.format("something-%d", System.currentTimeMillis()).getBytes());
+    }
+  }
+
+  @Override
+  public void prepare(byte[] instanceSplit) throws Exception {
+    log.info("Instance " + sourceContext.getInstanceId() + " got a new discovered task {}", new String(instanceSplit));
+    executor.submit(this);
+  }
+
+  @Override
+  public void run() {
+    try {
+      for (int i = 0; i < maxRecordsPerCycle; i++) {
+        Thread.sleep(50);
+        Record<Person> record = () -> new Person(fairy.person());
+        consume(record);
+      }
+      // this task is completed
+      consume(null);
+    } catch (Exception e) {
+      notifyError(e);
+    }
+  }
+}
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
index 9f44fd5..6cb5a49 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
@@ -37,6 +37,21 @@ public abstract class BatchPushSource<T> implements BatchSource<T> {
         }
     }
 
+    private static class ErrorNotifierRecord implements Record {
+        private Exception e;
+        public ErrorNotifierRecord(Exception e) {
+            this.e = e;
+        }
+        @Override
+        public Object getValue() {
+            return null;
+        }
+
+        public Exception getException() {
+            return e;
+        }
+    }
+
     private LinkedBlockingQueue<Record<T>> queue;
     private static final int DEFAULT_QUEUE_LENGTH = 1000;
     private final NullRecord nullRecord = new NullRecord();
@@ -48,6 +63,9 @@ public abstract class BatchPushSource<T> implements BatchSource<T> {
     @Override
     public Record<T> readNext() throws Exception {
         Record<T> record = queue.take();
+        if (record instanceof ErrorNotifierRecord) {
+            throw ((ErrorNotifierRecord) record).getException();
+        }
         if (record instanceof NullRecord) {
             return null;
         } else {
@@ -80,4 +98,12 @@ public abstract class BatchPushSource<T> implements BatchSource<T> {
     public int getQueueLength() {
         return DEFAULT_QUEUE_LENGTH;
     }
+
+    /**
+     * Allows the source to notify errors asynchronously
+     * @param ex
+     */
+    public void notifyError(Exception ex) {
+        consume(new ErrorNotifierRecord(ex));
+    }
 }
\ No newline at end of file
diff --git a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/BatchPushSourceTest.java b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/BatchPushSourceTest.java
new file mode 100644
index 0000000..3ac2b49
--- /dev/null
+++ b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/BatchPushSourceTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.core;
+
+import org.apache.pulsar.io.core.BatchPushSource;
+import org.apache.pulsar.io.core.SourceContext;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+import java.util.function.Consumer;
+
+public class BatchPushSourceTest {
+
+  BatchPushSource testBatchSource = new BatchPushSource() {
+    @Override
+    public void open(Map config, SourceContext context) throws Exception {
+
+    }
+
+    @Override
+    public void discover(Consumer taskEater) throws Exception {
+
+    }
+
+    @Override
+    public void prepare(byte[] task) throws Exception {
+
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+  };
+
+  @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "test exception")
+  public void testNotifyErrors() throws Exception {
+    testBatchSource.notifyError(new RuntimeException("test exception"));
+    testBatchSource.readNext();
+  }
+}